#!/usr/bin/evn python # -*- coding:utf-8 -*- import sys import os import json import GetSignature_2402 import logging # Setting the log level logging.basicConfig(level=logging.INFO) # logging.basicConfig(level=logging.DEBUG) # logging.basicConfig(level=logging.WARNING) # logging.basicConfig(level=logging.ERROR) # Determine whether the selected file is a Pcap file def IsPcapFile(filename): ret_str = os.popen("tshark -r " + filename + ' -Y "frame.number==1" -T fields -e frame.number').read() if ret_str == '': return False return True # TCP data flow analysis def TcpDataFlowAnalysis(signature_object, stream_dict, tcp_frame_signature_list): stream_dict["tcp.payload.c2s_first_data"] = signature_object.tcp_c2s_first_data(tcp_frame_signature_list) stream_dict["tcp.payload.s2c_first_data"] = signature_object.tcp_s2c_first_data(tcp_frame_signature_list) stream_dict["tcp.payload.c2s_first_data_len"] = signature_object.tcp_c2s_first_data_len(tcp_frame_signature_list) stream_dict["tcp.payload.s2c_first_data_len"] = signature_object.tcp_s2c_first_data_len(tcp_frame_signature_list) stream_dict["tcp.payload"] = signature_object.tcp_get_payload(tcp_frame_signature_list) return # UDP data flow analysis def UdpDataFlowAnalysis(signature_object, stream_dict, udp_frame_signature_list): stream_dict["udp.payload.c2s_first_data"] = signature_object.udp_c2s_first_data(udp_frame_signature_list) stream_dict["udp.payload.s2c_first_data"] = signature_object.udp_s2c_first_data(udp_frame_signature_list) stream_dict["udp.payload.c2s_first_data_len"] = signature_object.udp_c2s_first_data_len(udp_frame_signature_list) stream_dict["udp.payload.s2c_first_data_len"] = signature_object.udp_s2c_first_data_len(udp_frame_signature_list) stream_dict["udp.payload"] = signature_object.udp_get_payload(udp_frame_signature_list) return # General data flow analysis (common, ip, dns, http, ssl) def GeneralDataFlowAnalysis(signature_object, stream_dict, frame_signature_list): # common stream_dict["common.server_fqdn"] = signature_object.ssl_extensions_server_name(frame_signature_list) stream_dict["common.app_id"] = ['unknow'] if frame_signature_list[0]['ip.proto'] == '6' : stream_dict["srcport"] = signature_object.tcp_srcport(frame_signature_list) stream_dict["dstport"] = signature_object.tcp_dstport(frame_signature_list) else: stream_dict["srcport"] = signature_object.udp_srcport(frame_signature_list) stream_dict["dstport"] = signature_object.udp_dstport(frame_signature_list) # ip stream_dict["ip.src"] = signature_object.ip_src(frame_signature_list) stream_dict["ip.dst"] = signature_object.ip_dst(frame_signature_list) stream_dict["ip.proto"] = signature_object.ip_proto(frame_signature_list) stream_dict["heartbeat_flag"] = signature_object.heartbeat_flag(frame_signature_list) # dns stream_dict["dns.qry.name"] = signature_object.dns_qry_name(frame_signature_list) # http stream_dict["http.request.full_uri"] = signature_object.http_request_full_uri(frame_signature_list) stream_dict["http.request.header"] = signature_object.http_request_header(frame_signature_list) stream_dict["http.response.header"] = signature_object.http_response_header(frame_signature_list) # ssl stream_dict["ssl.handshake.certificate.algorithm_identifier"] = signature_object.ssl_algorithm_identifier(frame_signature_list) stream_dict["ssl.handshake.certificate.serial_number"] = signature_object.ssl_serial_number(frame_signature_list) stream_dict["ssl.handshake.certificate.issuer_common_name"] = signature_object.ssl_issuer_common_name(frame_signature_list) stream_dict["ssl.handshake.certificate.issuer_organization_name"] = signature_object.ssl_issuer_organization_name(frame_signature_list) stream_dict["ssl.handshake.certificate.issuer_country_name"] = signature_object.ssl_issuer_country_name(frame_signature_list) stream_dict["ssl.handshake.certificate.subject_common_name"] = signature_object.ssl_subject_common_name(frame_signature_list) stream_dict["ssl.handshake.certificate.subject_organization_name"] = signature_object.ssl_subject_organization_name(frame_signature_list) stream_dict["ssl.handshake.certificate.subject_country_name"] = signature_object.ssl_subject_country_name(frame_signature_list) stream_dict["ssl.handshake.certificate.not_valid_before"] = signature_object.ssl_not_valid_before(frame_signature_list) stream_dict["ssl.handshake.certificate.not_valid_after"] = signature_object.ssl_not_valid_after(frame_signature_list) stream_dict["ssl.handshake.certificate.algorithm_id"] = signature_object.ssl_algorithm_id(frame_signature_list) stream_dict["ssl.analysis.ja3"] = signature_object.ssl_ja3(frame_signature_list) stream_dict["ssl.analysis.sni_absent"] = signature_object.sni_absent(frame_signature_list) stream_dict["ssl.analysis.ech_enabled"] = signature_object.ssl_ech_enabled(frame_signature_list) stream_dict["ssl.analysis.esni_enabled"] = signature_object.ssl_analysis_esni_enabled(frame_signature_list) return if __name__=="__main__": # Get the pcap file name in the main function parameter if len(sys.argv) < 2 : logging.error("Please enter the correct parameters !!") sys.exit() pacp_file_path = sys.argv[1] # determine file is pcap if IsPcapFile(pacp_file_path): # Define the result output dict result_output_dict = {} # Creation signature extraction objects signature_object = GetSignature_2402.GetStreamSignatureFromTshrak(pacp_file_path) # Get all the field dictionaries parsed based on the Tshark command all_frame_signature_dict_list = signature_object._output_dict_list # Get basic information of TCP data streams tcp_stream_basic_info_list = GetSignature_2402.GetTCPStreamBaseInfo(all_frame_signature_dict_list) tcp_stream_all_info_list = tcp_stream_basic_info_list # Get other information of TCP data streams # Processing data stream by stream for i in range(len(tcp_stream_all_info_list)): # Get all the Frame IDs of the data stream tcp_frame_signature_list = signature_object.GetOneTcpFrameSignatureList(tcp_stream_all_info_list[i]['StreamID']) # Merge signature information from all Frame IDs # TCP data flow analysis TcpDataFlowAnalysis(signature_object, tcp_stream_all_info_list[i], tcp_frame_signature_list) # General data flow analysis (common, ip, dns, http, ssl) GeneralDataFlowAnalysis(signature_object, tcp_stream_all_info_list[i], tcp_frame_signature_list) # Get basic information of UDP data streams udp_stream_basic_info_list = GetSignature_2402.GetUDPStreamBaseInfo(all_frame_signature_dict_list) udp_stream_all_info_list = udp_stream_basic_info_list # Get other information of UDP data streams # Processing data stream by stream for i in range(len(udp_stream_all_info_list)): # Get all the Frame IDs of the data stream udp_frame_signature_list = signature_object.GetOneUdpFrameSignatureList(udp_stream_all_info_list[i]['StreamID']) # Merge signature information from all Frame IDs # UDP data flow analysis UdpDataFlowAnalysis(signature_object, udp_stream_all_info_list[i], udp_frame_signature_list) # General data flow analysis (common, ip, dns, http, ssl) GeneralDataFlowAnalysis(signature_object, udp_stream_all_info_list[i], udp_frame_signature_list) # Merge all data stream results result_output_dict = tcp_stream_all_info_list + udp_stream_all_info_list # Write signature dictionary to json with open('signature.json', 'w', encoding='utf-8') as f: json.dump(result_output_dict, f, ensure_ascii=False, indent=4) else: logging.error("The input is not a pcap file !!") with open('signature.json', 'w', encoding='utf-8') as f: json.dump({'Error':'The input is not a pcap file'}, f, ensure_ascii=False, indent=4) pass