552 lines
21 KiB
Python
552 lines
21 KiB
Python
from util.util_http import build_dns_query, make_ssl_context, get_domain_from_cert
|
||
from dns.message import from_wire
|
||
import dns
|
||
from dns.name import BadLabelType
|
||
from dns.exception import FormError
|
||
import time
|
||
import configparser
|
||
import pandas as pd
|
||
from util.util_kafka import make_kafka_producer, kafka_topic_fail, kafka_topic_cert, kafka_topic_suc, make_seed_consumer
|
||
from hyper import HTTP20Connection, HTTP11Connection
|
||
from hyper.http20.exceptions import ConnectionError, StreamResetError
|
||
import socket, h2.exceptions, ssl
|
||
from util.concurrent.futures import ThreadPoolExecutor
|
||
import threading
|
||
from util.thread_timeout import time_limited
|
||
import os
|
||
import json
|
||
from hyper.http20.response import HTTPHeaderMap
|
||
from hyper.http11.parser import ParseError
|
||
import zlib
|
||
import sys
|
||
import csv
|
||
from _csv import writer
|
||
|
||
# 定义except的错误信息,遇到这类错误需要特例处理
|
||
NOT_SUPPORT_FOR_HTTP2 = "No suitable protocol found"
|
||
CERTIFICATE_VERIFY_FAILED = "[SSL: CERTIFICATE_VERIFY_FAILED]"
|
||
|
||
# HTTP连接类型
|
||
UN_CONNECTED = 0
|
||
CONNECTED_HTTP1_1 = 1
|
||
CONNECTED_HTTP2_0 = 2
|
||
|
||
# 超时基数
|
||
TIMEOUT = 2
|
||
|
||
|
||
def write_data(data: dict, kafka_topic: str = ""):
|
||
lock.acquire()
|
||
if write_model == "csv":
|
||
data["status"] = kafka_topic
|
||
writer.writerow([data.__str__()])
|
||
else:
|
||
try:
|
||
writer.send(kafka_topic, data)
|
||
except Exception as e:
|
||
...
|
||
lock.release()
|
||
|
||
|
||
def make_doh_request_body(q_name="baidu.com"):
|
||
dns_query = build_dns_query(q_name)
|
||
dns_query_wire_format = dns_query.to_wire()
|
||
return dns_query_wire_format
|
||
|
||
|
||
def make_doh_request_header(hostname=None, conn_status=CONNECTED_HTTP2_0):
|
||
headers = {
|
||
"content-type": "application/dns-message",
|
||
"accept": "application/dns-message",
|
||
"user-agent": 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36 '
|
||
}
|
||
if hostname:
|
||
if conn_status == CONNECTED_HTTP2_0:
|
||
headers[":authority"] = hostname
|
||
elif conn_status == CONNECTED_HTTP1_1:
|
||
headers["host"] = hostname
|
||
else:
|
||
...
|
||
return headers
|
||
|
||
|
||
def decode_header(header: HTTPHeaderMap):
|
||
# print(header.__str__())
|
||
ret = {}
|
||
for key, value in header.items():
|
||
key = key.decode()
|
||
try:
|
||
value = value.decode()
|
||
except UnicodeDecodeError:
|
||
key = "UnicodeDecodeError"
|
||
if key not in ret.keys(): # 解决单key,多value情况
|
||
ret[key] = value
|
||
else:
|
||
ret[key] += (";" + value)
|
||
return ret
|
||
|
||
|
||
def probe_http2_0(conn: HTTP20Connection, host_group, path_group, methods):
|
||
stream_ids = {}
|
||
for hostname in host_group:
|
||
headers = make_doh_request_header(hostname)
|
||
for path in path_group:
|
||
for method in methods:
|
||
if len(stream_ids.keys()) >= 100: # 单IP、port限制探测100次
|
||
continue
|
||
if method == "GET":
|
||
full_path = "%s?dns=DUIBAAABAAAAAAAABWJhaWR1A2NvbQAAAQAB" % path
|
||
try:
|
||
stream_id = conn.request(method=method, url=full_path, headers=headers)
|
||
except Exception as e:
|
||
continue
|
||
else:
|
||
try:
|
||
stream_id = conn.request(method=method, url=path, body=body, headers=headers)
|
||
except Exception as e:
|
||
continue
|
||
stream_ids[stream_id] = (hostname, path, method)
|
||
|
||
for stream_id, conn_info in stream_ids.items():
|
||
hostname, path, method = conn_info
|
||
try:
|
||
rep = conn.get_response(stream_id)
|
||
except Exception as e:
|
||
# produce failInfo to kafka
|
||
data = {
|
||
"ip": conn.host,
|
||
"port": conn.port,
|
||
"host": hostname,
|
||
"path": path,
|
||
"method": method,
|
||
"connect_type": CONNECTED_HTTP2_0,
|
||
"info": e.__str__(),
|
||
}
|
||
write_data(data, kafka_topic_fail)
|
||
continue
|
||
rep_code = rep.status
|
||
rep_header = decode_header(rep.headers)
|
||
try:
|
||
rep_body = rep.read()
|
||
except Exception as e:
|
||
# produce failInfo to kafka
|
||
data = {
|
||
"ip": conn.host,
|
||
"port": conn.port,
|
||
"host": hostname,
|
||
"path": path,
|
||
"method": method,
|
||
"connect_type": CONNECTED_HTTP2_0,
|
||
"status_code": rep_code,
|
||
"rep_header": rep_header,
|
||
"info": e.__str__()
|
||
}
|
||
write_data(data, kafka_topic_fail)
|
||
return
|
||
# print(rep_body)
|
||
try:
|
||
rep_body = from_wire(rep_body)
|
||
if rep_body:
|
||
# produce sucInfo to kafka
|
||
data = {
|
||
"ip": conn.host,
|
||
"port": conn.port,
|
||
"host": hostname,
|
||
"path": path,
|
||
"method": method,
|
||
"connect_type": CONNECTED_HTTP2_0,
|
||
"status_code": rep_code,
|
||
"rep_header": rep_header,
|
||
"rep_body": rep_body.__str__()
|
||
}
|
||
# print(rep_body.__str__())
|
||
write_data(data, kafka_topic_suc)
|
||
else:
|
||
# produce failInfo to kafka
|
||
data = {
|
||
"ip": conn.host,
|
||
"port": conn.port,
|
||
"host": hostname,
|
||
"path": path,
|
||
"method": method,
|
||
"connect_type": CONNECTED_HTTP2_0,
|
||
"status_code": rep_code,
|
||
"rep_header": rep_header,
|
||
"info": rep_body.__str__()
|
||
}
|
||
write_data(data, kafka_topic_fail)
|
||
except Exception:
|
||
# produce failInfo to kafka
|
||
data = {
|
||
"ip": conn.host,
|
||
"port": conn.port,
|
||
"host": hostname,
|
||
"path": path,
|
||
"method": method,
|
||
"connect_type": CONNECTED_HTTP2_0,
|
||
"status_code": rep_code,
|
||
"rep_header": rep_header,
|
||
"info": rep_body.__str__()
|
||
}
|
||
write_data(data, kafka_topic_fail)
|
||
|
||
|
||
def probe_http1_1(conn: HTTP11Connection, host_group, path_group, methods):
|
||
# start_time = time.time()
|
||
for hostname in host_group:
|
||
headers = make_doh_request_header(hostname, CONNECTED_HTTP1_1)
|
||
for path in path_group:
|
||
for method in methods:
|
||
if method == "GET":
|
||
full_path = "%s?dns=DUIBAAABAAAAAAAABWJhaWR1A2NvbQAAAQAB" % path
|
||
try:
|
||
conn.request(method=method, url=full_path, headers=headers)
|
||
except Exception as e:
|
||
# produce failInfo to kafka
|
||
data = {
|
||
"ip": conn.host,
|
||
"port": conn.port,
|
||
"host": hostname,
|
||
"path": path,
|
||
"method": method,
|
||
"connect_type": CONNECTED_HTTP1_1,
|
||
"info": e.__str__(),
|
||
}
|
||
write_data(data, kafka_topic_fail)
|
||
# print("GET方法执行失败,错误信息如下:", e.__str__)
|
||
continue # 此处直接
|
||
else:
|
||
try:
|
||
conn.request(method=method, url=path, body=body, headers=headers)
|
||
except Exception as e:
|
||
# produce failInfo to kafka
|
||
data = {
|
||
"ip": conn.host,
|
||
"port": conn.port,
|
||
"host": hostname,
|
||
"path": path,
|
||
"method": method,
|
||
"connect_type": CONNECTED_HTTP1_1,
|
||
"info": e.__str__(),
|
||
}
|
||
write_data(data, kafka_topic_fail)
|
||
# print("POST方法执行失败,错误信息如下:", e.__str__)
|
||
continue
|
||
|
||
try:
|
||
rep = conn.get_response()
|
||
except Exception as e:
|
||
# produce failInfo to kafka
|
||
data = {
|
||
"ip": conn.host,
|
||
"port": conn.port,
|
||
"host": hostname,
|
||
"path": path,
|
||
"method": method,
|
||
"connect_type": CONNECTED_HTTP1_1,
|
||
"info": e.__str__(),
|
||
}
|
||
write_data(data, kafka_topic_fail)
|
||
# print("get_response方法执行失败,错误信息如下:", e)
|
||
continue
|
||
rep_code = rep.status
|
||
rep_header = decode_header(rep.headers)
|
||
try:
|
||
rep_body = rep.read()
|
||
except Exception as e:
|
||
# produce failInfo to kafka
|
||
data = {
|
||
"ip": conn.host,
|
||
"port": conn.port,
|
||
"host": hostname,
|
||
"path": path,
|
||
"method": method,
|
||
"connect_type": CONNECTED_HTTP1_1,
|
||
"status_code": rep_code,
|
||
"rep_header": rep_header,
|
||
"rep_body": e.__str__()
|
||
}
|
||
write_data(data, kafka_topic_fail)
|
||
# print("read方法执行失败,错误信息如下:", e)
|
||
continue
|
||
# print(rep_body)
|
||
try:
|
||
rep_body = from_wire(rep_body)
|
||
# # print(hostname, path, method)
|
||
# # print(rep_body)
|
||
if rep_body:
|
||
# produce sucInfo to kafka
|
||
data = {
|
||
"ip": conn.host,
|
||
"port": conn.port,
|
||
"host": hostname,
|
||
"path": path,
|
||
"method": method,
|
||
"connect_type": CONNECTED_HTTP1_1,
|
||
"status_code": rep_code,
|
||
"rep_header": rep_header,
|
||
"rep_body": rep_body.__str__()
|
||
}
|
||
# print(rep_body.__str__())
|
||
write_data(data, kafka_topic_suc)
|
||
else:
|
||
# produce failInfo to kafka
|
||
data = {
|
||
"ip": conn.host,
|
||
"port": conn.port,
|
||
"host": hostname,
|
||
"path": path,
|
||
"method": method,
|
||
"connect_type": CONNECTED_HTTP1_1,
|
||
"status_code": rep_code,
|
||
"rep_header": rep_header,
|
||
"info": None
|
||
}
|
||
write_data(data, kafka_topic_fail)
|
||
except Exception as e:
|
||
# produce failInfo to kafka
|
||
data = {
|
||
"ip": conn.host,
|
||
"port": conn.port,
|
||
"host": hostname,
|
||
"path": path,
|
||
"method": method,
|
||
"connect_type": CONNECTED_HTTP1_1,
|
||
"status_code": rep_code,
|
||
"rep_header": rep_header,
|
||
"info": e.__str__()
|
||
}
|
||
write_data(data, kafka_topic_fail)
|
||
# print("from_wire方法执行失败,错误信息如下:", e.__str__)
|
||
|
||
|
||
@time_limited(TIMEOUT * 10)
|
||
def doh_probe(ip, port, host_group=None, path_group=None):
|
||
verify_mode = ssl.CERT_OPTIONAL
|
||
ctx_2_0 = make_ssl_context()
|
||
ctx_2_0.set_alpn_protocols(['h2'])
|
||
conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
|
||
conn_status = UN_CONNECTED
|
||
try:
|
||
conn.connect()
|
||
conn_status = CONNECTED_HTTP2_0
|
||
# print("成功建立HTTP2.0连接!")
|
||
except Exception as e:
|
||
if e.__str__().startswith(NOT_SUPPORT_FOR_HTTP2): # 预期内的错误,继续尝试HTTP1.1
|
||
# print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1")
|
||
...
|
||
elif e.__str__().startswith(CERTIFICATE_VERIFY_FAILED): # 证书导致的错误,将sslContext设置为CERT_NONE
|
||
# print("证书原因导致连接建立失败,设置sslContext为CERT_NONE")
|
||
ctx_2_0.verify_mode = ssl.CERT_NONE
|
||
verify_mode = ssl.CERT_NONE
|
||
conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
|
||
try:
|
||
conn.connect()
|
||
conn_status = CONNECTED_HTTP2_0
|
||
# print("成功建立HTTP2.0连接!")
|
||
except Exception as e:
|
||
# print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1")
|
||
...
|
||
else:
|
||
# print("建立连接失败!错误信息如下:", e)
|
||
# produce failInfo to kafka
|
||
data = {
|
||
"ip": ip,
|
||
"port": port,
|
||
"connect_type": UN_CONNECTED,
|
||
"info": e.__str__(),
|
||
}
|
||
write_data(data, kafka_topic_fail)
|
||
return
|
||
|
||
if conn_status == UN_CONNECTED:
|
||
ctx_1_1 = make_ssl_context()
|
||
ctx_1_1.verify_mode = verify_mode
|
||
conn = HTTP11Connection(ip, port=port, ssl_context=ctx_1_1)
|
||
try:
|
||
conn.connect()
|
||
conn_status = CONNECTED_HTTP1_1
|
||
# print("成功建立HTTP1.1连接!")
|
||
except Exception as e: # 如使用HTTP1.1仍然无法建立连接,则放弃
|
||
# print("建立连接失败!错误信息如下:", e)
|
||
# produce failInfo to kafka
|
||
data = {
|
||
"ip": ip,
|
||
"port": port,
|
||
"connect_type": UN_CONNECTED,
|
||
"info": e.__str__(),
|
||
}
|
||
write_data(data, kafka_topic_fail)
|
||
return
|
||
|
||
try:
|
||
cert = conn._sock.getpeercert()
|
||
# produce certInfo to kafka
|
||
data = {
|
||
"ip": ip,
|
||
"port": port,
|
||
"certificate": cert.__str__()
|
||
}
|
||
write_data(data, kafka_topic_cert)
|
||
except Exception:
|
||
cert = None
|
||
|
||
if host_group:
|
||
host_group += get_domain_from_cert(cert)
|
||
else:
|
||
host_group = get_domain_from_cert(cert)
|
||
host_group += [None, ip] # host_group中至少包含None和原始IP地址
|
||
host_group = list(set(host_group)) # 去重
|
||
|
||
default_path_group = ["/dns-query", "/", "/resolve", "/doh", "/doh/family-filter",
|
||
"/doh/secure-filter", "/query", "/ads", "/uncensored", "adblock"]
|
||
if path_group:
|
||
path_group += default_path_group
|
||
path_group = list(set(path_group)) # 去重
|
||
else:
|
||
path_group = default_path_group
|
||
|
||
methods = ["POST", "GET"]
|
||
|
||
if conn_status == CONNECTED_HTTP2_0:
|
||
probe_http2_0(conn, host_group, path_group, methods)
|
||
elif conn_status == CONNECTED_HTTP1_1:
|
||
probe_http1_1(conn, host_group, path_group, methods)
|
||
else:
|
||
return
|
||
|
||
try:
|
||
conn.close()
|
||
except Exception as e:
|
||
...
|
||
|
||
|
||
def save_status(count):
|
||
data = {"count": count}
|
||
with open(save_status_file, "w") as fp:
|
||
json.dump(data, fp)
|
||
|
||
|
||
def load_status():
|
||
if not os.path.exists(save_status_file):
|
||
return 0
|
||
with open(save_status_file, "r") as fp:
|
||
a = json.load(fp)
|
||
return a["count"]
|
||
|
||
|
||
def run_with_csv_model():
|
||
chunks = pd.read_csv(seed_filename, iterator=True)
|
||
count = cpu_num * thread_num * union_detect_num
|
||
save_point = load_status()
|
||
count_scan = 0
|
||
while True:
|
||
print(1)
|
||
start_time = time.time()
|
||
data_group = []
|
||
try:
|
||
chunk_data = chunks.get_chunk(count)
|
||
except StopIteration: # 尾部数据丢失
|
||
break
|
||
for data in chunk_data.itertuples():
|
||
if count_scan < save_point: # 表示种子已经被探测过
|
||
count_scan += 1
|
||
continue
|
||
ip = data.saddr
|
||
port = data.sport
|
||
data_group.append((ip, port))
|
||
count_scan += 1
|
||
|
||
with ThreadPoolExecutor(max_workers=thread_num, thread_name_prefix="") as threadPool:
|
||
for data in data_group:
|
||
threadPool.submit(doh_probe, *data)
|
||
|
||
if count_scan > save_point:
|
||
save_status(count_scan)
|
||
|
||
if len(data_group) != 0:
|
||
end_time = time.time()
|
||
print(time.strftime('%y%m%d%H', time.localtime(time.time())))
|
||
print("CPU核数:", cpu_num, end=" ")
|
||
print("单核线程数:", thread_num, end=" ")
|
||
print("本轮完成的探测量为:", len(data_group), end=" ")
|
||
print("总耗时:", end_time - start_time, end=" ")
|
||
print("平均每秒探测量", len(data_group) / (end_time - start_time))
|
||
|
||
|
||
def run_with_kafka_model():
|
||
consumer = make_seed_consumer()
|
||
print_num = 10000 # 扫描print_num个IP后打印输出
|
||
max_queue_size = thread_num * 10 # 任务队列的长度是线程队列长度的10倍
|
||
with ThreadPoolExecutor(max_workers=thread_num, max_queue_size=max_queue_size, thread_name_prefix="") as threadPool:
|
||
count = 0
|
||
start_time = time.time()
|
||
for message in consumer:
|
||
data = message.value
|
||
ip = data["ip"]
|
||
port = int(data["port"])
|
||
threadPool.submit(doh_probe, ip, port)
|
||
# print("成功从seed中消费数据:", ip, port)
|
||
|
||
count += 1
|
||
if count % print_num == 0:
|
||
end_time = time.time()
|
||
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), end=" ")
|
||
print("CPU核数:", cpu_num, end=" ")
|
||
print("单核线程数:", thread_num, end=" ")
|
||
print("本轮完成的探测量为:", print_num, end=" ")
|
||
print("总耗时:", end_time - start_time, end=" ")
|
||
print("平均每秒探测量", print_num / (end_time - start_time))
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 定义根目录,方便程序移植
|
||
root_dir = sys.path[0]
|
||
|
||
# 读取配置文件
|
||
config = configparser.ConfigParser()
|
||
config_file = os.path.join(root_dir, "config.ini")
|
||
config.read(config_file)
|
||
|
||
cpu_num = config.getint('DetectConfigs', 'cpu_num', fallback=1) # 程序对CPU要求不高,暂不考虑多进程
|
||
thread_num = config.getint('DetectConfigs', 'thread_num', fallback=32)
|
||
write_model = config.get('DetectConfigs', 'write_model')
|
||
if write_model == "csv":
|
||
save_filename = os.path.join(root_dir, config.get('DetectConfigs', 'save_filename'))
|
||
writer = csv.writer(open(save_filename, "w", newline=""))
|
||
elif write_model == "kafka":
|
||
writer = make_kafka_producer()
|
||
else:
|
||
print("error write model!")
|
||
sys.exit()
|
||
|
||
seed_read_model = config.get('DetectConfigs', 'seed_read_model')
|
||
if seed_read_model == "csv":
|
||
union_detect_num = config.getint('DetectConfigs', 'union_detect_num', fallback=100) # 单线程单次探测量
|
||
seed_filename = os.path.join(root_dir, config.get('DetectConfigs', 'filename'))
|
||
elif seed_read_model == "kafka":
|
||
union_detect_num = None
|
||
seed_filename = None
|
||
else:
|
||
print("error seed_read_model!")
|
||
sys.exit()
|
||
|
||
# 定义探测状态记录文件,用作记录探测进度,防止程序崩溃导致重复探测
|
||
save_status_path = os.path.join(root_dir, "./status")
|
||
save_status_file = os.path.join(save_status_path, "status.json")
|
||
if not os.path.exists(save_status_path):
|
||
os.mkdir(save_status_path)
|
||
|
||
lock = threading.Lock()
|
||
body = make_doh_request_body()
|
||
|
||
# producers = [None] * thread_num
|
||
print("开始探测:")
|
||
print("-----", "进程数:", cpu_num)
|
||
print("-----", "线程数:", thread_num)
|
||
if seed_read_model == "csv":
|
||
print("-----", "单线程探测量:", union_detect_num)
|
||
run_with_csv_model()
|
||
elif seed_read_model == "kafka":
|
||
run_with_kafka_model()
|