This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
zhuyujia-webhopper/Infra_analyzer/infra_measure.py
little_stone bd2d50cf35 code update
2022-05-05 20:41:28 +08:00

301 lines
8.3 KiB
Python

import dns.resolver
import ssl
import pandas as pd
import OpenSSL
import rsa
from cryptography import x509
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import os
from Tools.domain_extract import Extracter
import re
from Infrastructure.infra2db import Infra2DB
import threading
import eventlet
import collections
import time
count = 0
lock = threading.Lock()
eventlet.monkey_patch()
class DNSResolver:
def __init__(self):
self.port = 443
self.dat = pd.DataFrame({
"resource_url": [],
"host": [],
"website": [],
"SOA": [],
"NS": [],
})
pass
@staticmethod
def dnsQuery(resource_url, website):
_, domain = Extracter.extract(resource_url)
soa, ns = None, []
SOA = dns.resolver.resolve(domain, "SOA")
for i in SOA.response.answer:
for j in i.items:
soa = j
NS = dns.resolver.resolve(domain, "NS")
for i in NS.response.answer:
for j in i.items:
ns.append(j)
is_privacy = 0
for n in ns:
_, d = Extracter.extract(n)
if d == domain:
is_privacy = 1
class CertResolver:
def __init__(self):
self.port = 443
self.dat = pd.DataFrame({
"resource_url": [],
"host": [],
"website": [],
"isHttps": [],
"crl": [],
"ocsp": [],
"ca_url": [],
"issuer": [],
"isPrivate": [],
"websiteSAN": [],
}, dtype=object)
self.dnsdat = pd.DataFrame({
"resource_url": [],
"nameserver": [],
"website": [],
"isPrivate": [],
})
def getCertObj(self, hostname):
with eventlet.Timeout(5, False):
cert = ssl.get_server_certificate((hostname, self.port)).encode()
cert_obj = x509.load_pem_x509_certificate(cert)
print("success")
return cert_obj
print("failed")
def get_NS(self, resource_url, website):
# domain 是资源二级域
hostname, domain = Extracter.extract(resource_url)
print(hostname)
try:
cert_obj = self.getCertObj(hostname)
except Exception as e:
print("Error:", e)
return e
# 获取SAN集合
san_set = set()
SAN = cert_obj.extensions.get_extension_for_class(x509.SubjectAlternativeName)
for item in SAN.value:
san_set.add(item.value)
nameserver = []
NS = dns.resolver.resolve(domain, "NS")
for i in NS.response.answer:
for j in i.items:
nameserver.append(str(j))
# 判断第三方
isPrivate = 0
for ns in nameserver:
_, sectld = Extracter.extract(ns)
if sectld == domain:
isPrivate = 1
break
elif cert_obj and sectld in san_set:
isPrivate = 1
break
else:
isPrivate = 0
lock.acquire()
self.dnsdat.loc[self.dnsdat.shape[0]] = [resource_url, nameserver, website, isPrivate]
lock.release()
return nameserver
def get_CRL_OSCP(self, resource_url, website):
"""
get the CRL and OCSP from the certificate of certain hostname
"""
hostname, domain = Extracter.extract(resource_url)
print(hostname)
try:
cert_obj = self.getCertObj(hostname)
except Exception as e:
print("Error:", e)
return e
# 提取tld
_, tld = Extracter.extract(hostname)
# 组织
issuer = cert_obj.issuer
# 获取SAN集合
san_set = set()
SAN = cert_obj.extensions.get_extension_for_class(x509.SubjectAlternativeName)
for item in SAN.value:
san_set.add(item.value)
# 获取CRL
crl = []
CRL = cert_obj.extensions.get_extension_for_class(x509.CRLDistributionPoints)
for i in CRL.value:
for j in i.full_name:
crl.append(j.value)
# 获取OCSP和ISSUER
ca_url, ocsp = None, None
OCSP = cert_obj.extensions.get_extension_for_class(x509.AuthorityInformationAccess)
for i in OCSP.value:
item = i.access_location.value
if item.endswith(".crt") or item.endswith(".der"):
ca_url = item
else:
ocsp = item
# 判断第三方
is_private = 0
_, ca_tld = Extracter.extract(ca_url)
if tld == ca_tld:
is_private = 1
elif ca_tld in san_set:
is_private = 1
# SOA 和SAN
elif 1:
is_private = 0
lock.acquire()
self.dat.loc[self.dat.shape[0]] = [resource_url, hostname, website, 1, tuple(crl), ocsp, ca_url, str(issuer), is_private, list(san_set)]
lock.release()
print(ocsp, crl, is_private)
return ocsp, crl, is_private
if __name__ == "__main__":
c = CertResolver()
writer = Infra2DB()
df = pd.read_csv("../sd/top_1w_rank10_with_score.csv")
print(df.info())
beg = time.time()
pool = ThreadPoolExecutor(max_workers=6)
all_task = []
# 24000 - 30000
for _, row in df.iterrows():
print(_, row["resource_url"], row["website"])
all_task.append(pool.submit(c.get_NS, row["resource_url"], row["website"]))
wait(all_task, timeout=5)
pool.shutdown()
end = time.time()
print(end - beg)
print(c.dnsdat.info())
print(c.dnsdat.head())
c.dnsdat.to_csv("../sd/DNSdep.csv", index=False)
# writer.writeDB(c.dat)
# print(c.dat.info())
# print(c.dat.head())
# c.dat.to_csv("../sd/CAdep.csv", index=False)
# dic = collections.defaultdict(int)
# df = pd.read_csv("ocdp.csv")
# for _, row in df.iterrows():
# h, domain = Extracter.extract(row["d"])
# dic[domain] += row["n"]
# print(dic)
#
# df = pd.DataFrame({
# "domain": [],
# "value": [],
# "ns": [],
# })
# for key, value in dic.items():
#
# NS = dns.resolver.resolve(key, "NS")
# li = []
# for i in NS.response.answer:
# for j in i.items:
# li.append(j)
# df.loc[df.shape[0]] = [key, value, list(li)]
# df.to_csv("ca-dns.csv", index=False)
#
# dic = collections.defaultdict(int)
# for _, row in df.iterrows():
# for j in row["ns"]:
# dic[str(j)] += row["value"]
#
# f = open("jianjieNS.csv", "w")
# for key, value in dic.items():
# f.write(key + "," + str(value) + "\n")
#
# pool = ThreadPoolExecutor(max_workers=6)
# all_task = []
# for _, row in df.iterrows():
# print(_, row["website"])
# all_task.append(pool.submit(gs.getSAN, row["website"]))
# wait(all_task, timeout=5)
# pool.shutdown()
# gs.dat.to_csv("sanlist", index=False)
# df = pd.read_csv("../research_1/top_1w_rank10_with_score 2.csv")
# print(df.info())
# beg = time.time()
# pool = ThreadPoolExecutor(max_workers=6)
# all_task = []
# # 24000 - 30000
# for _, row in df[:100].iterrows():
# print(_, row["resource_url"], row["website"])
#
# all_task.append(pool.submit(c.get_CRL_OSCP, row["resource_url"], row["website"]))
# wait(all_task, timeout=5)
# # # f = open("../research_1/human_test.txt", "r")
# # # data = f.read().split("\n")
# # # f.close()
# # # for hostname in data:
# # # print(hostname)
# # # pool.submit(c.get_CRL_OSCP, "cd", "bc", hostname)
# #
# pool.shutdown()
# end = time.time()
# print(end - beg)
#
# print(c.dat.info())
# print(c.dat.head())
# c.dat.to_csv("CAdep.csv", index=False)
# # # writer.writeDB(c.dat)
# writer = Infra2DB()
#
# dat = pd.DataFrame({
# "resource_url": ["https:/ww.ww.ww", "asdf"],
# "host": ["baidu.com", "wef"],
# "website": ["baidu.com", "weew"],
# "isHttps": [1, 1],
# "crl": [('efdd', ), ('deaf')],
# "ocsp": ["httpsd?:ef", "awe"],
# "ca_url": ["ecece", "aweda"],
# "isPrivate": [1, 1],
# }, dtype=object)
# writer.writeDB(dat)
# 获得每个网站的SAN SET