This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
wujiating-diamondv/util/get_cert.py
nanwct 8533fa17cf abc
2022-05-19 14:55:02 +08:00

119 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
from hyper import HTTP20Connection, HTTP11Connection
from util.thread_timeout import time_limited
from main import TIMEOUT, UN_CONNECTED, CONNECTED_HTTP1_1, CONNECTED_HTTP2_0, NOT_SUPPORT_FOR_HTTP2, \
make_doh_request_body, CERTIFICATE_VERIFY_FAILED
import threading
from util.util_http import make_ssl_context
import ssl, socket, sys, csv
from hyper.http20.exceptions import ConnectionError
from concurrent.futures import ThreadPoolExecutor
import os
root_dir = sys.path[0]
# %Y-%m-%d %H:%M:%S
datetime_str = time.strftime('%y%m%d%H', time.localtime(time.time()))
save_dir = "./verify"
if not os.path.exists(save_dir):
os.mkdir(save_dir)
save_file = os.path.join(root_dir, save_dir, f"{datetime_str}.csv")
# f_verify = csv.writer(open(f"./verify/{datetime_str}.csv", "w", newline=""))
f = open(save_file, "w", newline="")
f_verify = csv.writer(f)
# csv.writer(f_verify)
lock = threading.Lock()
body = make_doh_request_body()
@time_limited(TIMEOUT * 10)
def doh_verify(ip, port):
# print(ip, port, host, path, method)
verify_mode = ssl.CERT_OPTIONAL
ctx_2_0 = make_ssl_context()
ctx_2_0.set_alpn_protocols(['h2'])
# ctx_2_0.verify_mode = ssl.CERT_NONE
conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
conn_status = UN_CONNECTED
try:
conn.connect()
conn_status = CONNECTED_HTTP2_0
# print("成功建立HTTP2.0连接!")
except (AssertionError, TimeoutError, FileNotFoundError, ConnectionResetError, ConnectionRefusedError, OSError,
ssl.SSLCertVerificationError, ssl.SSLError,
socket.timeout,
ConnectionError, # hyper error
) as e:
if e.__str__().startswith(NOT_SUPPORT_FOR_HTTP2): # 预期内的错误继续尝试HTTP1.1
# print("建立连接失败服务端不支持HTTP2.0继续尝试HTTP1.1")
...
elif e.__str__().startswith(CERTIFICATE_VERIFY_FAILED): # 证书导致的错误将sslContext设置为CERT_NONE
# print("证书原因导致连接建立失败设置sslContext为CERT_NONE")
ctx_2_0.verify_mode = ssl.CERT_NONE
verify_mode = ssl.CERT_NONE
conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
try:
conn.connect()
conn_status = CONNECTED_HTTP2_0
# print("成功建立HTTP2.0连接!")
except (
AssertionError, TimeoutError, FileNotFoundError, ConnectionResetError, ConnectionRefusedError,
OSError,
ssl.SSLCertVerificationError, ssl.SSLError,
socket.timeout,
ConnectionError, # hyper error
) as e:
# print("建立连接失败服务端不支持HTTP2.0继续尝试HTTP1.1")
...
else:
# print("建立连接失败!错误信息如下:", e)
return False
if conn_status == UN_CONNECTED:
ctx_1_1 = make_ssl_context()
ctx_1_1.verify_mode = verify_mode
conn = HTTP11Connection(ip, port=port, ssl_context=ctx_1_1)
try:
conn.connect()
conn_status = CONNECTED_HTTP1_1
# print("成功建立HTTP1.1连接!")
except (TimeoutError, ConnectionRefusedError, FileNotFoundError, ConnectionAbortedError,
ssl.SSLCertVerificationError, ssl.SSLError,
socket.timeout, OSError
) as e: # 如使用HTTP1.1仍然无法建立连接,则放弃
# print("建立连接失败!错误信息如下:", e)
return False
try:
cert = conn._sock.getpeercert()
except AttributeError:
cert = None
lock.acquire()
try:
f_verify.writerow([ip, cert.__str__()])
except UnicodeEncodeError:
print(ip, cert.__str__())
lock.release()
def run_with_thread_pool(max_workers, data_group, fn):
# print("Before ThreadPool!")
with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="") as threadPool:
for data in data_group:
threadPool.submit(fn, *data)
if __name__ == '__main__':
data_group = []
with open("./suspect/no_cert_ips.csv") as fp:
csv_reader = csv.reader(fp)
for row in csv_reader:
ip = row[0]
data_group.append((ip, 443))
print(len(data_group))
run_with_thread_pool(64, data_group, doh_verify)
f.close()