This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
wujiating-diamondv/util/get_cert.py

119 lines
4.5 KiB
Python
Raw Normal View History

2022-05-19 14:55:02 +08:00
import time
from hyper import HTTP20Connection, HTTP11Connection
from util.thread_timeout import time_limited
from main import TIMEOUT, UN_CONNECTED, CONNECTED_HTTP1_1, CONNECTED_HTTP2_0, NOT_SUPPORT_FOR_HTTP2, \
make_doh_request_body, CERTIFICATE_VERIFY_FAILED
import threading
from util.util_http import make_ssl_context
import ssl, socket, sys, csv
from hyper.http20.exceptions import ConnectionError
from concurrent.futures import ThreadPoolExecutor
import os
root_dir = sys.path[0]
# %Y-%m-%d %H:%M:%S
datetime_str = time.strftime('%y%m%d%H', time.localtime(time.time()))
save_dir = "./verify"
if not os.path.exists(save_dir):
os.mkdir(save_dir)
save_file = os.path.join(root_dir, save_dir, f"{datetime_str}.csv")
# f_verify = csv.writer(open(f"./verify/{datetime_str}.csv", "w", newline=""))
f = open(save_file, "w", newline="")
f_verify = csv.writer(f)
# csv.writer(f_verify)
lock = threading.Lock()
body = make_doh_request_body()
@time_limited(TIMEOUT * 10)
def doh_verify(ip, port):
# print(ip, port, host, path, method)
verify_mode = ssl.CERT_OPTIONAL
ctx_2_0 = make_ssl_context()
ctx_2_0.set_alpn_protocols(['h2'])
# ctx_2_0.verify_mode = ssl.CERT_NONE
conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
conn_status = UN_CONNECTED
try:
conn.connect()
conn_status = CONNECTED_HTTP2_0
# print("成功建立HTTP2.0连接!")
except (AssertionError, TimeoutError, FileNotFoundError, ConnectionResetError, ConnectionRefusedError, OSError,
ssl.SSLCertVerificationError, ssl.SSLError,
socket.timeout,
ConnectionError, # hyper error
) as e:
if e.__str__().startswith(NOT_SUPPORT_FOR_HTTP2): # 预期内的错误继续尝试HTTP1.1
# print("建立连接失败服务端不支持HTTP2.0继续尝试HTTP1.1")
...
elif e.__str__().startswith(CERTIFICATE_VERIFY_FAILED): # 证书导致的错误将sslContext设置为CERT_NONE
# print("证书原因导致连接建立失败设置sslContext为CERT_NONE")
ctx_2_0.verify_mode = ssl.CERT_NONE
verify_mode = ssl.CERT_NONE
conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
try:
conn.connect()
conn_status = CONNECTED_HTTP2_0
# print("成功建立HTTP2.0连接!")
except (
AssertionError, TimeoutError, FileNotFoundError, ConnectionResetError, ConnectionRefusedError,
OSError,
ssl.SSLCertVerificationError, ssl.SSLError,
socket.timeout,
ConnectionError, # hyper error
) as e:
# print("建立连接失败服务端不支持HTTP2.0继续尝试HTTP1.1")
...
else:
# print("建立连接失败!错误信息如下:", e)
return False
if conn_status == UN_CONNECTED:
ctx_1_1 = make_ssl_context()
ctx_1_1.verify_mode = verify_mode
conn = HTTP11Connection(ip, port=port, ssl_context=ctx_1_1)
try:
conn.connect()
conn_status = CONNECTED_HTTP1_1
# print("成功建立HTTP1.1连接!")
except (TimeoutError, ConnectionRefusedError, FileNotFoundError, ConnectionAbortedError,
ssl.SSLCertVerificationError, ssl.SSLError,
socket.timeout, OSError
) as e: # 如使用HTTP1.1仍然无法建立连接,则放弃
# print("建立连接失败!错误信息如下:", e)
return False
try:
cert = conn._sock.getpeercert()
except AttributeError:
cert = None
lock.acquire()
try:
f_verify.writerow([ip, cert.__str__()])
except UnicodeEncodeError:
print(ip, cert.__str__())
lock.release()
def run_with_thread_pool(max_workers, data_group, fn):
# print("Before ThreadPool!")
with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="") as threadPool:
for data in data_group:
threadPool.submit(fn, *data)
if __name__ == '__main__':
data_group = []
with open("./suspect/no_cert_ips.csv") as fp:
csv_reader = csv.reader(fp)
for row in csv_reader:
ip = row[0]
data_group.append((ip, 443))
print(len(data_group))
run_with_thread_pool(64, data_group, doh_verify)
f.close()