This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
wujiating-diamondv/main.py

552 lines
21 KiB
Python
Raw Normal View History

2022-05-19 14:55:02 +08:00
from util.util_http import build_dns_query, make_ssl_context, get_domain_from_cert
from dns.message import from_wire
import dns
from dns.name import BadLabelType
from dns.exception import FormError
import time
import configparser
import pandas as pd
from util.util_kafka import make_kafka_producer, kafka_topic_fail, kafka_topic_cert, kafka_topic_suc, make_seed_consumer
from hyper import HTTP20Connection, HTTP11Connection
from hyper.http20.exceptions import ConnectionError, StreamResetError
import socket, h2.exceptions, ssl
from util.concurrent.futures import ThreadPoolExecutor
import threading
from util.thread_timeout import time_limited
import os
import json
from hyper.http20.response import HTTPHeaderMap
from hyper.http11.parser import ParseError
import zlib
import sys
import csv
from _csv import writer
# 定义except的错误信息遇到这类错误需要特例处理
NOT_SUPPORT_FOR_HTTP2 = "No suitable protocol found"
CERTIFICATE_VERIFY_FAILED = "[SSL: CERTIFICATE_VERIFY_FAILED]"
# HTTP连接类型
UN_CONNECTED = 0
CONNECTED_HTTP1_1 = 1
CONNECTED_HTTP2_0 = 2
# 超时基数
TIMEOUT = 2
def write_data(data: dict, kafka_topic: str = ""):
lock.acquire()
if write_model == "csv":
data["status"] = kafka_topic
writer.writerow([data.__str__()])
else:
try:
writer.send(kafka_topic, data)
except Exception as e:
...
lock.release()
def make_doh_request_body(q_name="baidu.com"):
dns_query = build_dns_query(q_name)
dns_query_wire_format = dns_query.to_wire()
return dns_query_wire_format
def make_doh_request_header(hostname=None, conn_status=CONNECTED_HTTP2_0):
headers = {
"content-type": "application/dns-message",
"accept": "application/dns-message",
"user-agent": 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36 '
}
if hostname:
if conn_status == CONNECTED_HTTP2_0:
headers[":authority"] = hostname
elif conn_status == CONNECTED_HTTP1_1:
headers["host"] = hostname
else:
...
return headers
def decode_header(header: HTTPHeaderMap):
# print(header.__str__())
ret = {}
for key, value in header.items():
key = key.decode()
try:
value = value.decode()
except UnicodeDecodeError:
key = "UnicodeDecodeError"
if key not in ret.keys(): # 解决单key,多value情况
ret[key] = value
else:
ret[key] += (";" + value)
return ret
def probe_http2_0(conn: HTTP20Connection, host_group, path_group, methods):
stream_ids = {}
for hostname in host_group:
headers = make_doh_request_header(hostname)
for path in path_group:
for method in methods:
if len(stream_ids.keys()) >= 100: # 单IP、port限制探测100次
continue
if method == "GET":
full_path = "%s?dns=DUIBAAABAAAAAAAABWJhaWR1A2NvbQAAAQAB" % path
try:
stream_id = conn.request(method=method, url=full_path, headers=headers)
except Exception as e:
continue
else:
try:
stream_id = conn.request(method=method, url=path, body=body, headers=headers)
except Exception as e:
continue
stream_ids[stream_id] = (hostname, path, method)
for stream_id, conn_info in stream_ids.items():
hostname, path, method = conn_info
try:
rep = conn.get_response(stream_id)
except Exception as e:
# produce failInfo to kafka
data = {
"ip": conn.host,
"port": conn.port,
"host": hostname,
"path": path,
"method": method,
"connect_type": CONNECTED_HTTP2_0,
"info": e.__str__(),
}
write_data(data, kafka_topic_fail)
continue
rep_code = rep.status
rep_header = decode_header(rep.headers)
try:
rep_body = rep.read()
except Exception as e:
# produce failInfo to kafka
data = {
"ip": conn.host,
"port": conn.port,
"host": hostname,
"path": path,
"method": method,
"connect_type": CONNECTED_HTTP2_0,
"status_code": rep_code,
"rep_header": rep_header,
"info": e.__str__()
}
write_data(data, kafka_topic_fail)
return
# print(rep_body)
try:
rep_body = from_wire(rep_body)
if rep_body:
# produce sucInfo to kafka
data = {
"ip": conn.host,
"port": conn.port,
"host": hostname,
"path": path,
"method": method,
"connect_type": CONNECTED_HTTP2_0,
"status_code": rep_code,
"rep_header": rep_header,
"rep_body": rep_body.__str__()
}
# print(rep_body.__str__())
write_data(data, kafka_topic_suc)
else:
# produce failInfo to kafka
data = {
"ip": conn.host,
"port": conn.port,
"host": hostname,
"path": path,
"method": method,
"connect_type": CONNECTED_HTTP2_0,
"status_code": rep_code,
"rep_header": rep_header,
"info": rep_body.__str__()
}
write_data(data, kafka_topic_fail)
except Exception:
# produce failInfo to kafka
data = {
"ip": conn.host,
"port": conn.port,
"host": hostname,
"path": path,
"method": method,
"connect_type": CONNECTED_HTTP2_0,
"status_code": rep_code,
"rep_header": rep_header,
"info": rep_body.__str__()
}
write_data(data, kafka_topic_fail)
def probe_http1_1(conn: HTTP11Connection, host_group, path_group, methods):
# start_time = time.time()
for hostname in host_group:
headers = make_doh_request_header(hostname, CONNECTED_HTTP1_1)
for path in path_group:
for method in methods:
if method == "GET":
full_path = "%s?dns=DUIBAAABAAAAAAAABWJhaWR1A2NvbQAAAQAB" % path
try:
conn.request(method=method, url=full_path, headers=headers)
except Exception as e:
# produce failInfo to kafka
data = {
"ip": conn.host,
"port": conn.port,
"host": hostname,
"path": path,
"method": method,
"connect_type": CONNECTED_HTTP1_1,
"info": e.__str__(),
}
write_data(data, kafka_topic_fail)
# print("GET方法执行失败错误信息如下", e.__str__)
continue # 此处直接
else:
try:
conn.request(method=method, url=path, body=body, headers=headers)
except Exception as e:
# produce failInfo to kafka
data = {
"ip": conn.host,
"port": conn.port,
"host": hostname,
"path": path,
"method": method,
"connect_type": CONNECTED_HTTP1_1,
"info": e.__str__(),
}
write_data(data, kafka_topic_fail)
# print("POST方法执行失败错误信息如下", e.__str__)
continue
try:
rep = conn.get_response()
except Exception as e:
# produce failInfo to kafka
data = {
"ip": conn.host,
"port": conn.port,
"host": hostname,
"path": path,
"method": method,
"connect_type": CONNECTED_HTTP1_1,
"info": e.__str__(),
}
write_data(data, kafka_topic_fail)
# print("get_response方法执行失败错误信息如下", e)
continue
rep_code = rep.status
rep_header = decode_header(rep.headers)
try:
rep_body = rep.read()
except Exception as e:
# produce failInfo to kafka
data = {
"ip": conn.host,
"port": conn.port,
"host": hostname,
"path": path,
"method": method,
"connect_type": CONNECTED_HTTP1_1,
"status_code": rep_code,
"rep_header": rep_header,
"rep_body": e.__str__()
}
write_data(data, kafka_topic_fail)
# print("read方法执行失败错误信息如下", e)
continue
# print(rep_body)
try:
rep_body = from_wire(rep_body)
# # print(hostname, path, method)
# # print(rep_body)
if rep_body:
# produce sucInfo to kafka
data = {
"ip": conn.host,
"port": conn.port,
"host": hostname,
"path": path,
"method": method,
"connect_type": CONNECTED_HTTP1_1,
"status_code": rep_code,
"rep_header": rep_header,
"rep_body": rep_body.__str__()
}
# print(rep_body.__str__())
write_data(data, kafka_topic_suc)
else:
# produce failInfo to kafka
data = {
"ip": conn.host,
"port": conn.port,
"host": hostname,
"path": path,
"method": method,
"connect_type": CONNECTED_HTTP1_1,
"status_code": rep_code,
"rep_header": rep_header,
"info": None
}
write_data(data, kafka_topic_fail)
except Exception as e:
# produce failInfo to kafka
data = {
"ip": conn.host,
"port": conn.port,
"host": hostname,
"path": path,
"method": method,
"connect_type": CONNECTED_HTTP1_1,
"status_code": rep_code,
"rep_header": rep_header,
"info": e.__str__()
}
write_data(data, kafka_topic_fail)
# print("from_wire方法执行失败错误信息如下", e.__str__)
@time_limited(TIMEOUT * 10)
def doh_probe(ip, port, host_group=None, path_group=None):
verify_mode = ssl.CERT_OPTIONAL
ctx_2_0 = make_ssl_context()
ctx_2_0.set_alpn_protocols(['h2'])
conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
conn_status = UN_CONNECTED
try:
conn.connect()
conn_status = CONNECTED_HTTP2_0
# print("成功建立HTTP2.0连接!")
except Exception as e:
if e.__str__().startswith(NOT_SUPPORT_FOR_HTTP2): # 预期内的错误继续尝试HTTP1.1
# print("建立连接失败服务端不支持HTTP2.0继续尝试HTTP1.1")
...
elif e.__str__().startswith(CERTIFICATE_VERIFY_FAILED): # 证书导致的错误将sslContext设置为CERT_NONE
# print("证书原因导致连接建立失败设置sslContext为CERT_NONE")
ctx_2_0.verify_mode = ssl.CERT_NONE
verify_mode = ssl.CERT_NONE
conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
try:
conn.connect()
conn_status = CONNECTED_HTTP2_0
# print("成功建立HTTP2.0连接!")
except Exception as e:
# print("建立连接失败服务端不支持HTTP2.0继续尝试HTTP1.1")
...
else:
# print("建立连接失败!错误信息如下:", e)
# produce failInfo to kafka
data = {
"ip": ip,
"port": port,
"connect_type": UN_CONNECTED,
"info": e.__str__(),
}
write_data(data, kafka_topic_fail)
return
if conn_status == UN_CONNECTED:
ctx_1_1 = make_ssl_context()
ctx_1_1.verify_mode = verify_mode
conn = HTTP11Connection(ip, port=port, ssl_context=ctx_1_1)
try:
conn.connect()
conn_status = CONNECTED_HTTP1_1
# print("成功建立HTTP1.1连接!")
except Exception as e: # 如使用HTTP1.1仍然无法建立连接,则放弃
# print("建立连接失败!错误信息如下:", e)
# produce failInfo to kafka
data = {
"ip": ip,
"port": port,
"connect_type": UN_CONNECTED,
"info": e.__str__(),
}
write_data(data, kafka_topic_fail)
return
try:
cert = conn._sock.getpeercert()
# produce certInfo to kafka
data = {
"ip": ip,
"port": port,
"certificate": cert.__str__()
}
write_data(data, kafka_topic_cert)
except Exception:
cert = None
if host_group:
host_group += get_domain_from_cert(cert)
else:
host_group = get_domain_from_cert(cert)
host_group += [None, ip] # host_group中至少包含None和原始IP地址
host_group = list(set(host_group)) # 去重
default_path_group = ["/dns-query", "/", "/resolve", "/doh", "/doh/family-filter",
"/doh/secure-filter", "/query", "/ads", "/uncensored", "adblock"]
if path_group:
path_group += default_path_group
path_group = list(set(path_group)) # 去重
else:
path_group = default_path_group
methods = ["POST", "GET"]
if conn_status == CONNECTED_HTTP2_0:
probe_http2_0(conn, host_group, path_group, methods)
elif conn_status == CONNECTED_HTTP1_1:
probe_http1_1(conn, host_group, path_group, methods)
else:
return
try:
conn.close()
except Exception as e:
...
def save_status(count):
data = {"count": count}
with open(save_status_file, "w") as fp:
json.dump(data, fp)
def load_status():
if not os.path.exists(save_status_file):
return 0
with open(save_status_file, "r") as fp:
a = json.load(fp)
return a["count"]
def run_with_csv_model():
chunks = pd.read_csv(seed_filename, iterator=True)
count = cpu_num * thread_num * union_detect_num
save_point = load_status()
count_scan = 0
while True:
print(1)
start_time = time.time()
data_group = []
try:
chunk_data = chunks.get_chunk(count)
except StopIteration: # 尾部数据丢失
break
for data in chunk_data.itertuples():
if count_scan < save_point: # 表示种子已经被探测过
count_scan += 1
continue
ip = data.saddr
port = data.sport
data_group.append((ip, port))
count_scan += 1
with ThreadPoolExecutor(max_workers=thread_num, thread_name_prefix="") as threadPool:
for data in data_group:
threadPool.submit(doh_probe, *data)
if count_scan > save_point:
save_status(count_scan)
if len(data_group) != 0:
end_time = time.time()
print(time.strftime('%y%m%d%H', time.localtime(time.time())))
print("CPU核数", cpu_num, end=" ")
print("单核线程数:", thread_num, end=" ")
print("本轮完成的探测量为:", len(data_group), end=" ")
print("总耗时:", end_time - start_time, end=" ")
print("平均每秒探测量", len(data_group) / (end_time - start_time))
def run_with_kafka_model():
consumer = make_seed_consumer()
print_num = 10000 # 扫描print_num个IP后打印输出
max_queue_size = thread_num * 10 # 任务队列的长度是线程队列长度的10倍
with ThreadPoolExecutor(max_workers=thread_num, max_queue_size=max_queue_size, thread_name_prefix="") as threadPool:
count = 0
start_time = time.time()
for message in consumer:
data = message.value
ip = data["ip"]
port = int(data["port"])
threadPool.submit(doh_probe, ip, port)
# print("成功从seed中消费数据", ip, port)
count += 1
if count % print_num == 0:
end_time = time.time()
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), end=" ")
print("CPU核数", cpu_num, end=" ")
print("单核线程数:", thread_num, end=" ")
print("本轮完成的探测量为:", print_num, end=" ")
print("总耗时:", end_time - start_time, end=" ")
print("平均每秒探测量", print_num / (end_time - start_time))
if __name__ == '__main__':
# 定义根目录,方便程序移植
root_dir = sys.path[0]
# 读取配置文件
config = configparser.ConfigParser()
config_file = os.path.join(root_dir, "config.ini")
config.read(config_file)
cpu_num = config.getint('DetectConfigs', 'cpu_num', fallback=1) # 程序对CPU要求不高暂不考虑多进程
thread_num = config.getint('DetectConfigs', 'thread_num', fallback=32)
write_model = config.get('DetectConfigs', 'write_model')
if write_model == "csv":
save_filename = os.path.join(root_dir, config.get('DetectConfigs', 'save_filename'))
writer = csv.writer(open(save_filename, "w", newline=""))
elif write_model == "kafka":
writer = make_kafka_producer()
else:
print("error write model!")
sys.exit()
seed_read_model = config.get('DetectConfigs', 'seed_read_model')
if seed_read_model == "csv":
union_detect_num = config.getint('DetectConfigs', 'union_detect_num', fallback=100) # 单线程单次探测量
seed_filename = os.path.join(root_dir, config.get('DetectConfigs', 'filename'))
elif seed_read_model == "kafka":
union_detect_num = None
seed_filename = None
else:
print("error seed_read_model!")
sys.exit()
# 定义探测状态记录文件,用作记录探测进度,防止程序崩溃导致重复探测
save_status_path = os.path.join(root_dir, "./status")
save_status_file = os.path.join(save_status_path, "status.json")
if not os.path.exists(save_status_path):
os.mkdir(save_status_path)
lock = threading.Lock()
body = make_doh_request_body()
# producers = [None] * thread_num
print("开始探测:")
print("-----", "进程数:", cpu_num)
print("-----", "线程数:", thread_num)
if seed_read_model == "csv":
print("-----", "单线程探测量:", union_detect_num)
run_with_csv_model()
elif seed_read_model == "kafka":
run_with_kafka_model()