from util.util_http import build_dns_query, make_ssl_context, get_domain_from_cert from dns.message import from_wire import dns from dns.name import BadLabelType from dns.exception import FormError import time import configparser import pandas as pd from util.util_kafka import make_kafka_producer, kafka_topic_fail, kafka_topic_cert, kafka_topic_suc, make_seed_consumer from hyper import HTTP20Connection, HTTP11Connection from hyper.http20.exceptions import ConnectionError, StreamResetError import socket, h2.exceptions, ssl from util.concurrent.futures import ThreadPoolExecutor import threading from util.thread_timeout import time_limited import os import json from hyper.http20.response import HTTPHeaderMap from hyper.http11.parser import ParseError import zlib import sys import csv from _csv import writer # 定义except的错误信息,遇到这类错误需要特例处理 NOT_SUPPORT_FOR_HTTP2 = "No suitable protocol found" CERTIFICATE_VERIFY_FAILED = "[SSL: CERTIFICATE_VERIFY_FAILED]" # HTTP连接类型 UN_CONNECTED = 0 CONNECTED_HTTP1_1 = 1 CONNECTED_HTTP2_0 = 2 # 超时基数 TIMEOUT = 2 def write_data(data: dict, kafka_topic: str = ""): lock.acquire() if write_model == "csv": data["status"] = kafka_topic writer.writerow([data.__str__()]) else: try: writer.send(kafka_topic, data) except Exception as e: ... lock.release() def make_doh_request_body(q_name="baidu.com"): dns_query = build_dns_query(q_name) dns_query_wire_format = dns_query.to_wire() return dns_query_wire_format def make_doh_request_header(hostname=None, conn_status=CONNECTED_HTTP2_0): headers = { "content-type": "application/dns-message", "accept": "application/dns-message", "user-agent": 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36 ' } if hostname: if conn_status == CONNECTED_HTTP2_0: headers[":authority"] = hostname elif conn_status == CONNECTED_HTTP1_1: headers["host"] = hostname else: ... return headers def decode_header(header: HTTPHeaderMap): # print(header.__str__()) ret = {} for key, value in header.items(): key = key.decode() try: value = value.decode() except UnicodeDecodeError: key = "UnicodeDecodeError" if key not in ret.keys(): # 解决单key,多value情况 ret[key] = value else: ret[key] += (";" + value) return ret def probe_http2_0(conn: HTTP20Connection, host_group, path_group, methods): stream_ids = {} for hostname in host_group: headers = make_doh_request_header(hostname) for path in path_group: for method in methods: if len(stream_ids.keys()) >= 100: # 单IP、port限制探测100次 continue if method == "GET": full_path = "%s?dns=DUIBAAABAAAAAAAABWJhaWR1A2NvbQAAAQAB" % path try: stream_id = conn.request(method=method, url=full_path, headers=headers) except Exception as e: continue else: try: stream_id = conn.request(method=method, url=path, body=body, headers=headers) except Exception as e: continue stream_ids[stream_id] = (hostname, path, method) for stream_id, conn_info in stream_ids.items(): hostname, path, method = conn_info try: rep = conn.get_response(stream_id) except Exception as e: # produce failInfo to kafka data = { "ip": conn.host, "port": conn.port, "host": hostname, "path": path, "method": method, "connect_type": CONNECTED_HTTP2_0, "info": e.__str__(), } write_data(data, kafka_topic_fail) continue rep_code = rep.status rep_header = decode_header(rep.headers) try: rep_body = rep.read() except Exception as e: # produce failInfo to kafka data = { "ip": conn.host, "port": conn.port, "host": hostname, "path": path, "method": method, "connect_type": CONNECTED_HTTP2_0, "status_code": rep_code, "rep_header": rep_header, "info": e.__str__() } write_data(data, kafka_topic_fail) return # print(rep_body) try: rep_body = from_wire(rep_body) if rep_body: # produce sucInfo to kafka data = { "ip": conn.host, "port": conn.port, "host": hostname, "path": path, "method": method, "connect_type": CONNECTED_HTTP2_0, "status_code": rep_code, "rep_header": rep_header, "rep_body": rep_body.__str__() } # print(rep_body.__str__()) write_data(data, kafka_topic_suc) else: # produce failInfo to kafka data = { "ip": conn.host, "port": conn.port, "host": hostname, "path": path, "method": method, "connect_type": CONNECTED_HTTP2_0, "status_code": rep_code, "rep_header": rep_header, "info": rep_body.__str__() } write_data(data, kafka_topic_fail) except Exception: # produce failInfo to kafka data = { "ip": conn.host, "port": conn.port, "host": hostname, "path": path, "method": method, "connect_type": CONNECTED_HTTP2_0, "status_code": rep_code, "rep_header": rep_header, "info": rep_body.__str__() } write_data(data, kafka_topic_fail) def probe_http1_1(conn: HTTP11Connection, host_group, path_group, methods): # start_time = time.time() for hostname in host_group: headers = make_doh_request_header(hostname, CONNECTED_HTTP1_1) for path in path_group: for method in methods: if method == "GET": full_path = "%s?dns=DUIBAAABAAAAAAAABWJhaWR1A2NvbQAAAQAB" % path try: conn.request(method=method, url=full_path, headers=headers) except Exception as e: # produce failInfo to kafka data = { "ip": conn.host, "port": conn.port, "host": hostname, "path": path, "method": method, "connect_type": CONNECTED_HTTP1_1, "info": e.__str__(), } write_data(data, kafka_topic_fail) # print("GET方法执行失败,错误信息如下:", e.__str__) continue # 此处直接 else: try: conn.request(method=method, url=path, body=body, headers=headers) except Exception as e: # produce failInfo to kafka data = { "ip": conn.host, "port": conn.port, "host": hostname, "path": path, "method": method, "connect_type": CONNECTED_HTTP1_1, "info": e.__str__(), } write_data(data, kafka_topic_fail) # print("POST方法执行失败,错误信息如下:", e.__str__) continue try: rep = conn.get_response() except Exception as e: # produce failInfo to kafka data = { "ip": conn.host, "port": conn.port, "host": hostname, "path": path, "method": method, "connect_type": CONNECTED_HTTP1_1, "info": e.__str__(), } write_data(data, kafka_topic_fail) # print("get_response方法执行失败,错误信息如下:", e) continue rep_code = rep.status rep_header = decode_header(rep.headers) try: rep_body = rep.read() except Exception as e: # produce failInfo to kafka data = { "ip": conn.host, "port": conn.port, "host": hostname, "path": path, "method": method, "connect_type": CONNECTED_HTTP1_1, "status_code": rep_code, "rep_header": rep_header, "rep_body": e.__str__() } write_data(data, kafka_topic_fail) # print("read方法执行失败,错误信息如下:", e) continue # print(rep_body) try: rep_body = from_wire(rep_body) # # print(hostname, path, method) # # print(rep_body) if rep_body: # produce sucInfo to kafka data = { "ip": conn.host, "port": conn.port, "host": hostname, "path": path, "method": method, "connect_type": CONNECTED_HTTP1_1, "status_code": rep_code, "rep_header": rep_header, "rep_body": rep_body.__str__() } # print(rep_body.__str__()) write_data(data, kafka_topic_suc) else: # produce failInfo to kafka data = { "ip": conn.host, "port": conn.port, "host": hostname, "path": path, "method": method, "connect_type": CONNECTED_HTTP1_1, "status_code": rep_code, "rep_header": rep_header, "info": None } write_data(data, kafka_topic_fail) except Exception as e: # produce failInfo to kafka data = { "ip": conn.host, "port": conn.port, "host": hostname, "path": path, "method": method, "connect_type": CONNECTED_HTTP1_1, "status_code": rep_code, "rep_header": rep_header, "info": e.__str__() } write_data(data, kafka_topic_fail) # print("from_wire方法执行失败,错误信息如下:", e.__str__) @time_limited(TIMEOUT * 10) def doh_probe(ip, port, host_group=None, path_group=None): verify_mode = ssl.CERT_OPTIONAL ctx_2_0 = make_ssl_context() ctx_2_0.set_alpn_protocols(['h2']) conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0) conn_status = UN_CONNECTED try: conn.connect() conn_status = CONNECTED_HTTP2_0 # print("成功建立HTTP2.0连接!") except Exception as e: if e.__str__().startswith(NOT_SUPPORT_FOR_HTTP2): # 预期内的错误,继续尝试HTTP1.1 # print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1") ... elif e.__str__().startswith(CERTIFICATE_VERIFY_FAILED): # 证书导致的错误,将sslContext设置为CERT_NONE # print("证书原因导致连接建立失败,设置sslContext为CERT_NONE") ctx_2_0.verify_mode = ssl.CERT_NONE verify_mode = ssl.CERT_NONE conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0) try: conn.connect() conn_status = CONNECTED_HTTP2_0 # print("成功建立HTTP2.0连接!") except Exception as e: # print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1") ... else: # print("建立连接失败!错误信息如下:", e) # produce failInfo to kafka data = { "ip": ip, "port": port, "connect_type": UN_CONNECTED, "info": e.__str__(), } write_data(data, kafka_topic_fail) return if conn_status == UN_CONNECTED: ctx_1_1 = make_ssl_context() ctx_1_1.verify_mode = verify_mode conn = HTTP11Connection(ip, port=port, ssl_context=ctx_1_1) try: conn.connect() conn_status = CONNECTED_HTTP1_1 # print("成功建立HTTP1.1连接!") except Exception as e: # 如使用HTTP1.1仍然无法建立连接,则放弃 # print("建立连接失败!错误信息如下:", e) # produce failInfo to kafka data = { "ip": ip, "port": port, "connect_type": UN_CONNECTED, "info": e.__str__(), } write_data(data, kafka_topic_fail) return try: cert = conn._sock.getpeercert() # produce certInfo to kafka data = { "ip": ip, "port": port, "certificate": cert.__str__() } write_data(data, kafka_topic_cert) except Exception: cert = None if host_group: host_group += get_domain_from_cert(cert) else: host_group = get_domain_from_cert(cert) host_group += [None, ip] # host_group中至少包含None和原始IP地址 host_group = list(set(host_group)) # 去重 default_path_group = ["/dns-query", "/", "/resolve", "/doh", "/doh/family-filter", "/doh/secure-filter", "/query", "/ads", "/uncensored", "adblock"] if path_group: path_group += default_path_group path_group = list(set(path_group)) # 去重 else: path_group = default_path_group methods = ["POST", "GET"] if conn_status == CONNECTED_HTTP2_0: probe_http2_0(conn, host_group, path_group, methods) elif conn_status == CONNECTED_HTTP1_1: probe_http1_1(conn, host_group, path_group, methods) else: return try: conn.close() except Exception as e: ... def save_status(count): data = {"count": count} with open(save_status_file, "w") as fp: json.dump(data, fp) def load_status(): if not os.path.exists(save_status_file): return 0 with open(save_status_file, "r") as fp: a = json.load(fp) return a["count"] def run_with_csv_model(): chunks = pd.read_csv(seed_filename, iterator=True) count = cpu_num * thread_num * union_detect_num save_point = load_status() count_scan = 0 while True: print(1) start_time = time.time() data_group = [] try: chunk_data = chunks.get_chunk(count) except StopIteration: # 尾部数据丢失 break for data in chunk_data.itertuples(): if count_scan < save_point: # 表示种子已经被探测过 count_scan += 1 continue ip = data.saddr port = data.sport data_group.append((ip, port)) count_scan += 1 with ThreadPoolExecutor(max_workers=thread_num, thread_name_prefix="") as threadPool: for data in data_group: threadPool.submit(doh_probe, *data) if count_scan > save_point: save_status(count_scan) if len(data_group) != 0: end_time = time.time() print(time.strftime('%y%m%d%H', time.localtime(time.time()))) print("CPU核数:", cpu_num, end=" ") print("单核线程数:", thread_num, end=" ") print("本轮完成的探测量为:", len(data_group), end=" ") print("总耗时:", end_time - start_time, end=" ") print("平均每秒探测量", len(data_group) / (end_time - start_time)) def run_with_kafka_model(): consumer = make_seed_consumer() print_num = 10000 # 扫描print_num个IP后打印输出 max_queue_size = thread_num * 10 # 任务队列的长度是线程队列长度的10倍 with ThreadPoolExecutor(max_workers=thread_num, max_queue_size=max_queue_size, thread_name_prefix="") as threadPool: count = 0 start_time = time.time() for message in consumer: data = message.value ip = data["ip"] port = int(data["port"]) threadPool.submit(doh_probe, ip, port) # print("成功从seed中消费数据:", ip, port) count += 1 if count % print_num == 0: end_time = time.time() print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), end=" ") print("CPU核数:", cpu_num, end=" ") print("单核线程数:", thread_num, end=" ") print("本轮完成的探测量为:", print_num, end=" ") print("总耗时:", end_time - start_time, end=" ") print("平均每秒探测量", print_num / (end_time - start_time)) if __name__ == '__main__': # 定义根目录,方便程序移植 root_dir = sys.path[0] # 读取配置文件 config = configparser.ConfigParser() config_file = os.path.join(root_dir, "config.ini") config.read(config_file) cpu_num = config.getint('DetectConfigs', 'cpu_num', fallback=1) # 程序对CPU要求不高,暂不考虑多进程 thread_num = config.getint('DetectConfigs', 'thread_num', fallback=32) write_model = config.get('DetectConfigs', 'write_model') if write_model == "csv": save_filename = os.path.join(root_dir, config.get('DetectConfigs', 'save_filename')) writer = csv.writer(open(save_filename, "w", newline="")) elif write_model == "kafka": writer = make_kafka_producer() else: print("error write model!") sys.exit() seed_read_model = config.get('DetectConfigs', 'seed_read_model') if seed_read_model == "csv": union_detect_num = config.getint('DetectConfigs', 'union_detect_num', fallback=100) # 单线程单次探测量 seed_filename = os.path.join(root_dir, config.get('DetectConfigs', 'filename')) elif seed_read_model == "kafka": union_detect_num = None seed_filename = None else: print("error seed_read_model!") sys.exit() # 定义探测状态记录文件,用作记录探测进度,防止程序崩溃导致重复探测 save_status_path = os.path.join(root_dir, "./status") save_status_file = os.path.join(save_status_path, "status.json") if not os.path.exists(save_status_path): os.mkdir(save_status_path) lock = threading.Lock() body = make_doh_request_body() # producers = [None] * thread_num print("开始探测:") print("-----", "进程数:", cpu_num) print("-----", "线程数:", thread_num) if seed_read_model == "csv": print("-----", "单线程探测量:", union_detect_num) run_with_csv_model() elif seed_read_model == "kafka": run_with_kafka_model()