上传新文件

This commit is contained in:
何勇
2024-07-03 12:48:08 +00:00
parent 0444e7347f
commit 87c448e2cd

136
SignatureExtract.py Normal file
View File

@@ -0,0 +1,136 @@
#!/usr/bin/evn python
# -*- coding:utf-8 -*-
import sys
import os
import json
import GetSignature_2402
import logging
# Setting the log level
logging.basicConfig(level=logging.INFO)
# logging.basicConfig(level=logging.DEBUG)
# logging.basicConfig(level=logging.WARNING)
# logging.basicConfig(level=logging.ERROR)
# Determine whether the selected file is a Pcap file
def IsPcapFile(filename):
ret_str = os.popen("tshark -r " + filename + ' -Y "frame.number==1" -T fields -e frame.number').read()
if ret_str == '':
return False
return True
# TCP data flow analysis
def TcpDataFlowAnalysis(signature_object, stream_dict, tcp_frame_signature_list):
stream_dict["tcp.payload.c2s_first_data"] = signature_object.tcp_c2s_first_data(tcp_frame_signature_list)
stream_dict["tcp.payload.s2c_first_data"] = signature_object.tcp_s2c_first_data(tcp_frame_signature_list)
stream_dict["tcp.payload.c2s_first_data_len"] = signature_object.tcp_c2s_first_data_len(tcp_frame_signature_list)
stream_dict["tcp.payload.s2c_first_data_len"] = signature_object.tcp_s2c_first_data_len(tcp_frame_signature_list)
stream_dict["tcp.payload"] = signature_object.tcp_get_payload(tcp_frame_signature_list)
return
# UDP data flow analysis
def UdpDataFlowAnalysis(signature_object, stream_dict, udp_frame_signature_list):
stream_dict["udp.payload.c2s_first_data"] = signature_object.udp_c2s_first_data(udp_frame_signature_list)
stream_dict["udp.payload.s2c_first_data"] = signature_object.udp_s2c_first_data(udp_frame_signature_list)
stream_dict["udp.payload.c2s_first_data_len"] = signature_object.udp_c2s_first_data_len(udp_frame_signature_list)
stream_dict["udp.payload.s2c_first_data_len"] = signature_object.udp_s2c_first_data_len(udp_frame_signature_list)
stream_dict["udp.payload"] = signature_object.udp_get_payload(udp_frame_signature_list)
return
# General data flow analysis (common, ip, dns, http, ssl)
def GeneralDataFlowAnalysis(signature_object, stream_dict, frame_signature_list):
# common
stream_dict["common.server_fqdn"] = signature_object.ssl_extensions_server_name(frame_signature_list)
stream_dict["common.app_id"] = ['unknow']
if frame_signature_list[0]['ip.proto'] == '6' :
stream_dict["srcport"] = signature_object.tcp_srcport(frame_signature_list)
stream_dict["dstport"] = signature_object.tcp_dstport(frame_signature_list)
else:
stream_dict["srcport"] = signature_object.udp_srcport(frame_signature_list)
stream_dict["dstport"] = signature_object.udp_dstport(frame_signature_list)
# ip
stream_dict["ip.src"] = signature_object.ip_src(frame_signature_list)
stream_dict["ip.dst"] = signature_object.ip_dst(frame_signature_list)
stream_dict["ip.proto"] = signature_object.ip_proto(frame_signature_list)
stream_dict["heartbeat_flag"] = signature_object.heartbeat_flag(frame_signature_list)
# dns
stream_dict["dns.qry.name"] = signature_object.dns_qry_name(frame_signature_list)
# http
stream_dict["http.request.full_uri"] = signature_object.http_request_full_uri(frame_signature_list)
stream_dict["http.request.header"] = signature_object.http_request_header(frame_signature_list)
stream_dict["http.response.header"] = signature_object.http_response_header(frame_signature_list)
# ssl
stream_dict["ssl.handshake.certificate.algorithm_identifier"] = signature_object.ssl_algorithm_identifier(frame_signature_list)
stream_dict["ssl.handshake.certificate.serial_number"] = signature_object.ssl_serial_number(frame_signature_list)
stream_dict["ssl.handshake.certificate.issuer_common_name"] = signature_object.ssl_issuer_common_name(frame_signature_list)
stream_dict["ssl.handshake.certificate.issuer_organization_name"] = signature_object.ssl_issuer_organization_name(frame_signature_list)
stream_dict["ssl.handshake.certificate.issuer_country_name"] = signature_object.ssl_issuer_country_name(frame_signature_list)
stream_dict["ssl.handshake.certificate.subject_common_name"] = signature_object.ssl_subject_common_name(frame_signature_list)
stream_dict["ssl.handshake.certificate.subject_organization_name"] = signature_object.ssl_subject_organization_name(frame_signature_list)
stream_dict["ssl.handshake.certificate.subject_country_name"] = signature_object.ssl_subject_country_name(frame_signature_list)
stream_dict["ssl.handshake.certificate.not_valid_before"] = signature_object.ssl_not_valid_before(frame_signature_list)
stream_dict["ssl.handshake.certificate.not_valid_after"] = signature_object.ssl_not_valid_after(frame_signature_list)
stream_dict["ssl.handshake.certificate.algorithm_id"] = signature_object.ssl_algorithm_id(frame_signature_list)
stream_dict["ssl.analysis.ja3"] = signature_object.ssl_ja3(frame_signature_list)
stream_dict["ssl.analysis.sni_absent"] = signature_object.sni_absent(frame_signature_list)
stream_dict["ssl.analysis.ech_enabled"] = signature_object.ssl_ech_enabled(frame_signature_list)
stream_dict["ssl.analysis.esni_enabled"] = signature_object.ssl_analysis_esni_enabled(frame_signature_list)
return
if __name__=="__main__":
# Get the pcap file name in the main function parameter
if len(sys.argv) < 2 :
logging.error("Please enter the correct parameters !!")
sys.exit()
pacp_file_path = sys.argv[1]
# determine file is pcap
if IsPcapFile(pacp_file_path):
# Define the result output dict
result_output_dict = {}
# Creation signature extraction objects
signature_object = GetSignature_2402.GetStreamSignatureFromTshrak(pacp_file_path)
# Get all the field dictionaries parsed based on the Tshark command
all_frame_signature_dict_list = signature_object._output_dict_list
# Get basic information of TCP data streams
tcp_stream_basic_info_list = GetSignature_2402.GetTCPStreamBaseInfo(all_frame_signature_dict_list)
tcp_stream_all_info_list = tcp_stream_basic_info_list
# Get other information of TCP data streams
# Processing data stream by stream
for i in range(len(tcp_stream_all_info_list)):
# Get all the Frame IDs of the data stream
tcp_frame_signature_list = signature_object.GetOneTcpFrameSignatureList(tcp_stream_all_info_list[i]['StreamID'])
# Merge signature information from all Frame IDs
# TCP data flow analysis
TcpDataFlowAnalysis(signature_object, tcp_stream_all_info_list[i], tcp_frame_signature_list)
# General data flow analysis (common, ip, dns, http, ssl)
GeneralDataFlowAnalysis(signature_object, tcp_stream_all_info_list[i], tcp_frame_signature_list)
# Get basic information of UDP data streams
udp_stream_basic_info_list = GetSignature_2402.GetUDPStreamBaseInfo(all_frame_signature_dict_list)
udp_stream_all_info_list = udp_stream_basic_info_list
# Get other information of UDP data streams
# Processing data stream by stream
for i in range(len(udp_stream_all_info_list)):
# Get all the Frame IDs of the data stream
udp_frame_signature_list = signature_object.GetOneUdpFrameSignatureList(udp_stream_all_info_list[i]['StreamID'])
# Merge signature information from all Frame IDs
# UDP data flow analysis
UdpDataFlowAnalysis(signature_object, udp_stream_all_info_list[i], udp_frame_signature_list)
# General data flow analysis (common, ip, dns, http, ssl)
GeneralDataFlowAnalysis(signature_object, udp_stream_all_info_list[i], udp_frame_signature_list)
# Merge all data stream results
result_output_dict = tcp_stream_all_info_list + udp_stream_all_info_list
# Write signature dictionary to json
with open('signature.json', 'w', encoding='utf-8') as f:
json.dump(result_output_dict, f, ensure_ascii=False, indent=4)
else:
logging.error("The input is not a pcap file !!")
with open('signature.json', 'w', encoding='utf-8') as f:
json.dump({'Error':'The input is not a pcap file'}, f, ensure_ascii=False, indent=4)
pass