#!/usr/bin/evn python # -*- coding:utf-8 -*- from datetime import datetime import subprocess import codecs import binascii from cryptography import x509 from cryptography.hazmat.backends import default_backend # algorithm_identifier and algorithm_id mapping relationship dictionary algorithm_identifier_dict = { "1.3.14.3.2.2" : "md4WithRSA", "1.3.14.3.2.3" : "md5WithRSA", "1.3.14.3.2.4" : "md4WithRSAEncryption", "1.3.14.3.2.6" : "desECB", "1.3.14.3.2.11" : "rsaSignature", "1.3.14.3.2.14" : "mdc2WithRSASignature", "1.3.14.3.2.15" : "shaWithRSASignature", "1.3.14.3.2.16" : "dhWithCommonModulus", "1.3.14.3.2.17" : "desEDE", "1.3.14.3.2.18" : "sha", "1.3.14.3.2.19" : "mdc-2", "1.3.14.3.2.20" : "dsaCommon", "1.3.14.3.2.21" : "dsaCommonWithSHA", "1.3.14.3.2.22" : "rsaKeyTransport", "1.3.14.3.2.23" : "keyed-hash-seal", "1.3.14.3.2.24" : "md2WithRSASignature", "1.3.14.3.2.25" : "md5WithRSASignature", "1.3.14.3.2.26" : "SHA-1", "1.3.14.3.2.27" : "dsaWithSHA1", "1.3.14.3.2.28" : "dsaWithCommonSHA1", "1.3.14.3.2.29" : "sha-1WithRSAEncryption", "1.2.840.10040.4.1" : "id-dsa", "1.2.840.10046.2.1" : "dhpublicnumber", "2.16.840.1.101.2.1.1.22" : "id-keyExchangeAlgorithm", "1.2.840.10045.2.1" : "id-ecPublicKey", "1.3.132.1.12" : "id-ecDH", "1.2.840.10045.2.13" : "id-ecMQV", "1.2.840.113549.1.1.10" : "id-RSASSA-PSS", "1.2.840.113549.1.1.8" : "id-mgf1", "1.2.840.10045.1.1" : "prime-field", "1.2.840.113549.2.2" : "md2", "1.2.840.113549.2.4" : "md4", "1.2.840.113549.2.5" : "md5", "1.2.840.113549.1.1.1" : "rsaEncryption", "1.2.840.113549.1.1.2" : "md2WithRSAEncryption", "1.2.840.113549.1.1.3" : "md4WithRSAEncryption", "1.2.840.113549.1.1.4" : "md5WithRSAEncryption", "1.2.840.113549.1.1.5" : "sha1WithRSAEncryption", "1.2.840.113549.1.1.6" : "rsaOAEPEncryptionSET", "1.2.840.113549.1.1.11" : "sha256WithRSAEncryption", "1.2.840.113549.1.1.12" : "sha384WithRSAEncryption", "1.2.840.113549.1.1.13" : "sha512WithRSAEncryption", "1.2.840.113549.1.1.14" : "sha224WithRSAEncryption", "1.2.840.10045.4.1" : "ecdsa-with-SHA1", "1.2.156.10197.1.501" : "SM2-with-SM3", "1.2.840.10045.4.3.1" : "ecdsa-with-SHA224", "1.2.840.10045.4.3.2" : "ecdsa-with-SHA256", "1.2.840.10045.4.3.3" : "ecdsa-with-SHA384", "1.2.840.10045.4.3.4" : "ecdsa-with-SHA512", "2.16.840.1.101.3.4.3.1" : "id-dsa-with-sha224", "2.16.840.1.101.3.4.3.2" : "id-dsa-with-sha256", "1.2.840.10045.3.1.1" : "secp192r1", "1.3.132.0.1" : "sect163k1", "1.3.132.0.15" : "sect163r2", "1.3.132.0.33" : "secp224r1", "1.3.132.0.26" : "sect233k1", "1.3.132.0.27" : "sect233r1", "1.2.840.10045.3.1.7" : "secp256r1", "1.3.132.0.16" : "sect283k1", "1.3.132.0.17" : "sect283r1", "1.3.132.0.34" : "secp384r1", "1.3.132.0.36" : "sect409k1", "1.3.132.0.37" : "sect409r1", "1.3.132.0.35" : "secp521r1", "1.3.132.0.38" : "sect571k1", "1.3.132.0.39" : "sect571r1", "1.2.156.10197.1.301" : "sm2", "2.16.840.1.101.3.4.2.1" : "sha256" , "2.16.840.1.101.3.4.2.2" : "sha384" , "2.16.840.1.101.3.4.2.3" : "sha512" , "2.16.840.1.101.3.4.2.4" : "sha224" , "1.2.156.10197.1.401" : "sm3", "1.3.6.1.4.1.2.267.7.4.4" : "dilithium2" , "1.3.9999.2.7.1" : "p256_dilithium2" , "1.3.9999.2.7.2" : "rsa3072_dilithium2" , "1.3.6.1.4.1.2.267.7.6.5" : "dilithium3" , "1.3.9999.2.7.3" : "p384_dilithium3" , "1.3.6.1.4.1.2.267.7.8.7" : "dilithium5" , "1.3.9999.2.7.4" : "p521_dilithium5" , "1.3.6.1.4.1.2.267.11.4.4" : "dilithium2_aes" , "1.3.9999.2.11.1" : "p256_dilithium2_aes" , "1.3.9999.2.11.2" : "rsa3072_dilithium2_aes" , "1.3.6.1.4.1.2.267.11.6.5" : "dilithium3_aes" , "1.3.9999.2.11.3" : "p384_dilithium3_aes" , "1.3.6.1.4.1.2.267.11.8.7" : "dilithium5_aes" , "1.3.9999.2.11.4" : "p521_dilithium5_aes" , "1.3.9999.3.1" : "falcon512" , "1.3.9999.3.2" : "p256_falcon512" , "1.3.9999.3.3" : "rsa3072_falcon512" , "1.3.9999.3.4" : "falcon1024" , "1.3.9999.3.5" : "p521_falcon1024" , "1.3.6.1.4.1.311.89.2.1.7" : "picnicl1full" , "1.3.6.1.4.1.311.89.2.1.8" : "p256_picnicl1full" , "1.3.6.1.4.1.311.89.2.1.9" : "rsa3072_picnicl1full" , "1.3.6.1.4.1.311.89.2.1.21" : "picnic3l1" , "1.3.6.1.4.1.311.89.2.1.22" : "p256_picnic3l1" , "1.3.6.1.4.1.311.89.2.1.23" : "rsa3072_picnic3l1" , "1.3.9999.5.1.1.1" : "rainbowIclassic" , "1.3.9999.5.1.2.1" : "p256_rainbowIclassic" , "1.3.9999.5.1.3.1" : "rsa3072_rainbowIclassic" , "1.3.9999.5.3.1.1" : "rainbowVclassic" , "1.3.9999.5.3.2.1" : "p521_rainbowVclassic" , "1.3.9999.6.1.1" : "sphincsharaka128frobust" , "1.3.9999.6.1.2" : "p256_sphincsharaka128frobust" , "1.3.9999.6.1.3" : "rsa3072_sphincsharaka128frobust" , "1.3.9999.6.4.1" : "sphincssha256128frobust" , "1.3.9999.6.4.2" : "p256_sphincssha256128frobust" , "1.3.9999.6.4.3" : "rsa3072_sphincssha256128frobust" , "1.3.9999.6.7.1" : "sphincsshake256128frobust" , "1.3.9999.6.7.2" : "p256_sphincsshake256128frobust" , "1.3.9999.6.7.3" : "rsa3072_sphincsshake256128frobust" } # http header parsing def HexToHttpHeaders(hex_string): byte_data = bytes.fromhex(hex_string) http_headers_str = byte_data.decode('utf-8') lines = http_headers_str.split('\r\n') headers = {} for line in lines: if ': ' in line: key, value = line.split(': ', 1) headers[key] = value return headers # Other process running commands def RunCommand(command): ret_str = subprocess.run(command, stdout=subprocess.PIPE, shell=True) decoded_result = codecs.decode(ret_str.stdout, 'utf-8') list_1 = decoded_result.splitlines() list_output = [] for i in range(0, len(list_1)) : list_2 = list_1[i].split("\t") list_output.append(list_2) return list_output # Get data stream signature class class GetStreamSignatureFromTshrak (): def __init__(self, pacp_file_path): # Define all fields to be output using tshark tshark_filter_list = [ 'frame.number' , 'frame.time', 'tcp.stream', 'udp.stream', 'ip.src' , 'ip.dst' , 'ip.proto' , 'data' , 'udp.srcport' , 'udp.dstport' , 'udp.payload' , 'udp.length' , 'tcp.srcport' , 'tcp.dstport' , 'tcp.payload' , 'tcp.len' , 'tcp.flags.syn' , 'tcp.flags.ack' , 'tcp.flags.reset', 'tcp.flags.fin', 'tcp.window_size_value', 'tls.record.content_type', 'tls.handshake.extensions_server_name' , 'tls.handshake.session_id', 'tls.handshake.type', 'tls.handshake.ja3' , 'tls.handshake.certificate' , 'tls.esni.encrypted_sni', 'x509af.serialNumber' , 'x509af.algorithm.id' , 'x509ce.dNSName', 'dns.qry.name' , 'http.request', 'http.response', 'http.request.full_uri', 'http.file_data', 'quic' ] # Piece tshark commands tshark_filter_str = '' for i in range(len(tshark_filter_list)): tshark_filter_str += ' -e ' + tshark_filter_list[i] output_list = RunCommand("tshark -r " + pacp_file_path + " -T fields " + tshark_filter_str) # All tshark output fields are stored in the dictionary self._output_dict_list = [] for i in range(len(output_list)): frame_output_dict = {} for j in range(len(output_list[i])): frame_output_dict[tshark_filter_list[j]] = output_list[i][j] self._output_dict_list.append(frame_output_dict) return # Get a list of TCP stream IDs def GetTcpStreamIdList(self): tcp_stream_ID_list = [] for i in range(len(self._output_dict_list)): if self._output_dict_list[i]['tcp.stream'] != '': if self._output_dict_list[i]['tcp.stream'] not in tcp_stream_ID_list: tcp_stream_ID_list.append(self._output_dict_list[i]['tcp.stream']) return tcp_stream_ID_list # Get a list of UDP stream IDs def GetUdpStreamIdList(self): udp_stream_ID_list = [] for i in range(len(self._output_dict_list)): if self._output_dict_list[i]['udp.stream'] != '': if self._output_dict_list[i]['udp.stream'] not in udp_stream_ID_list: udp_stream_ID_list.append(self._output_dict_list[i]['udp.stream']) return udp_stream_ID_list # Get the feature list of all frames in the specified TCP stream def GetOneTcpFrameSignatureList(self, frame_ID): tcp_frame_signature_list = [] for i in range(len(self._output_dict_list)): if self._output_dict_list[i]['tcp.stream'] == str(frame_ID): tcp_frame_signature_list.append(self._output_dict_list[i]) return tcp_frame_signature_list # Get the feature list of all frames in the specified UDP stream def GetOneUdpFrameSignatureList(self, frame_ID): udp_frame_signature_list = [] for i in range(len(self._output_dict_list)): if self._output_dict_list[i]['udp.stream'] == str(frame_ID): udp_frame_signature_list.append(self._output_dict_list[i]) return udp_frame_signature_list # Determine TCP client IP and server IP def GetTcpClinetAndServerIP(self, frame_signature_list): client_ip = '' client_port = '' server_ip = '' server_port = '' # tcp.flags.syn==1 && tcp.flags.ack==0 First Handshake # tcp.flags.syn==1 && tcp.flags.ack==1 Second handshake # tcp.flags.syn==0 && tcp.flags.ack==1 Third handshake # If the first three packets of the TCP stream meet the three-way handshake condition, # it means that the handshake phase of the TCP data stream has been captured. if len(frame_signature_list) >= 3: if frame_signature_list[0]['tcp.flags.syn'] == '1' and frame_signature_list[0]['tcp.flags.ack'] == '0' and \ frame_signature_list[1]['tcp.flags.syn'] == '1' and frame_signature_list[1]['tcp.flags.ack'] == '1' and \ frame_signature_list[2]['tcp.flags.syn'] == '0' and frame_signature_list[2]['tcp.flags.ack'] == '1' : client_ip = frame_signature_list[0]['ip.src'] client_port = frame_signature_list[0]['tcp.srcport'] server_ip = frame_signature_list[0]['ip.dst'] server_port = frame_signature_list[0]['tcp.dstport'] else: if int(frame_signature_list[0]['tcp.srcport']) > 5000: client_ip = frame_signature_list[0]['ip.src'] client_port = frame_signature_list[0]['tcp.srcport'] server_ip = frame_signature_list[0]['ip.dst'] server_port = frame_signature_list[0]['tcp.dstport'] else: client_ip = frame_signature_list[0]['ip.dst'] client_port = frame_signature_list[0]['tcp.dstport'] server_ip = frame_signature_list[0]['ip.src'] server_port = frame_signature_list[0]['tcp.srcport'] # If there is no handshake packet, use a port greater than 5000 as the client else: if int(frame_signature_list[0]['tcp.srcport']) > 5000: client_ip = frame_signature_list[0]['ip.src'] client_port = frame_signature_list[0]['tcp.srcport'] server_ip = frame_signature_list[0]['ip.dst'] server_port = frame_signature_list[0]['tcp.dstport'] else: client_ip = frame_signature_list[0]['ip.dst'] client_port = frame_signature_list[0]['tcp.dstport'] server_ip = frame_signature_list[0]['ip.src'] server_port = frame_signature_list[0]['tcp.srcport'] return [client_ip, client_port, server_ip, server_port] # Determine UDP client IP and server IP def GetUdpClinetAndServerIP(self, frame_signature_list): client_ip = '' client_port = '' server_ip = '' server_port = '' # The client port is larger than the server port if int(frame_signature_list[0]['udp.srcport']) >= int(frame_signature_list[0]['udp.dstport']): client_ip = frame_signature_list[0]['ip.src'] client_port = frame_signature_list[0]['udp.srcport'] server_ip = frame_signature_list[0]['ip.dst'] server_port = frame_signature_list[0]['udp.dstport'] else: client_ip = frame_signature_list[0]['ip.dst'] client_port = frame_signature_list[0]['udp.dstport'] server_ip = frame_signature_list[0]['ip.src'] server_port = frame_signature_list[0]['udp.srcport'] return [client_ip, client_port, server_ip, server_port] # Get the valid characters in the payload def GetValidCharactersFromPayload(self, payload): ret_list_all = [] binary_data = binascii.unhexlify(payload) printable_positions = [] # Iterate over the bytes object to find the positions of consecutive printable characters start = None end = 0 for i, byte in enumerate(binary_data): char = chr(byte) if char.isprintable(): if start is None: start = i else: if start is not None: end = i - 1 if end - start > 16 and "\\x" not in str(binary_data[start:end+1]) : ret_list = [start, end, payload[start:end+1], binary_data[start:end + 1].decode('utf-8', errors='ignore')] ret_list_all.append(ret_list) printable_positions.append((start, end)) start = None # Process the last paragraph if start is not None: if end - start > 16 and "\\x" not in str(binary_data[start:end+1]) : ret_list = [start, end, payload[start:end+1], binary_data[start:end + 1].decode('utf-8', errors='ignore')] ret_list_all.append(ret_list) printable_positions.append((start, len(binary_data) - 1)) return ret_list_all ####################################### ssl ####################################### # sni_absent def sni_absent(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): if frame_signature_list[i]['tls.handshake.extensions_server_name'] != '' and frame_signature_list[i]['tls.handshake.extensions_server_name'] not in ret_list: ret_list.append(frame_signature_list[i]['tls.handshake.extensions_server_name']) if len(ret_list) == 0: return ['Ture'] else: return ['False'] # ssl_ech_enabled tls.handshake.type == 8 def ssl_ech_enabled(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): if frame_signature_list[i]['tls.handshake.type'] == '8' : ret_list.append(frame_signature_list[i]['tls.handshake.type']) if len(ret_list) == 0: return ['False'] else: return ['Ture'] # ssl_use_session_resumption tls.handshake.session_id def ssl_use_session_resumption(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tls.handshake.session_id']) if frame_signature_list[i]['tls.handshake.session_id'] not in ret_list and frame_signature_list[i]['tls.handshake.session_id'] != '': ret_list.append(frame_signature_list[i]['tls.handshake.session_id']) if len(ret_list) == 0: return ["use_session_resumption"] else: return ["new_link"] # ssl_use_selfsigned_certificate tls.handshake.type == 13 Certificate Request tls.handshake.type == 15 Certificate Verify # 自签发证书没有一个由权威证书颁发的证书链,因此 TLS 握手时可能缺少 "Certificate Request" 和 "Certificate Verify" 阶段。 # 在 "Server Hello Done" 之后,客户端发送的 "Client Key Exchange" 之前,你可以检查是否有 "Certificate Request" 消息, # 如果没有,可能是自签发证书。 def ssl_use_selfsigned_certificate(self, frame_signature_list): ret_list = [] ret_list_temp = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tls.handshake.type']) if frame_signature_list[i]['tls.handshake.type'] != '': ret_list_temp.extend(frame_signature_list[i]['tls.handshake.type'].split(',')) for i in range(len(ret_list_temp)): if ret_list_temp[i] not in ret_list: ret_list.append(ret_list_temp[i]) # print(ret_list) if '13' in ret_list and '15' in ret_list: return ["Authoritative certificate"] else: return ["use_selfsigned_certificate"] # ssl_ja3 def ssl_ja3(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tls.handshake.ja3']) if frame_signature_list[i]['tls.handshake.ja3'] not in ret_list and frame_signature_list[i]['tls.handshake.ja3'] != '': ret_list.append(frame_signature_list[i]['tls.handshake.ja3']) return ret_list # ssl_extensions_server_name def ssl_extensions_server_name(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): if frame_signature_list[i]['quic'] != '1' and frame_signature_list[i]['tls.handshake.extensions_server_name'] != '' and frame_signature_list[i]['tls.handshake.extensions_server_name'] not in ret_list: ret_list.append(frame_signature_list[i]['tls.handshake.extensions_server_name']) return ret_list # ssl_fingerprint tls.handshake.certificate def ssl_fingerprint(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tls.handshake.certificate']) if frame_signature_list[i]['tls.handshake.certificate'] not in ret_list and frame_signature_list[i]['tls.handshake.certificate'] != '': ret_list.append(frame_signature_list[i]['tls.handshake.certificate']) return ret_list # ssl_serial_number x509af.serialNumber def ssl_serial_number(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['x509af.serialNumber']) if frame_signature_list[i]['x509af.serialNumber'] != '' and frame_signature_list[i]['x509af.serialNumber'] not in ret_list: ret_list.append(frame_signature_list[i]['x509af.serialNumber']) return ret_list # ssl_issuer_common_name def ssl_issuer_common_name(self, frame_signature_list): ret_list = [] ret_certificate_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tls.handshake.certificate']) if frame_signature_list[i]['tls.handshake.certificate'] != '': ret_certificate_list.extend(frame_signature_list[i]['tls.handshake.certificate'].split(',')) for i in range(len(ret_certificate_list)): # 解析证书 bytes.fromhex(hex_string) cert = x509.load_der_x509_certificate(bytes.fromhex(ret_certificate_list[i]), default_backend()) issuer = cert.issuer common_name = None for attribute in issuer: if attribute.oid == x509.NameOID.COMMON_NAME: common_name = attribute.value ret_list.append(common_name) return ret_list # ssl_issuer_organization_name RFC 4519 2.5.4.10 def ssl_issuer_organization_name(self, frame_signature_list): ret_list = [] ret_certificate_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tls.handshake.certificate']) if frame_signature_list[i]['tls.handshake.certificate'] != '': ret_certificate_list.extend(frame_signature_list[i]['tls.handshake.certificate'].split(',')) for i in range(len(ret_certificate_list)): # 解析证书 bytes.fromhex(hex_string) cert = x509.load_der_x509_certificate(bytes.fromhex(ret_certificate_list[i]), default_backend()) issuer = cert.issuer common_name = None for attribute in issuer: if attribute.oid == x509.NameOID.ORGANIZATION_NAME: common_name = attribute.value ret_list.append(common_name) return ret_list # ssl_issuer_country_name RFC 4519 2.5.4.6 def ssl_issuer_country_name(self, frame_signature_list): ret_list = [] ret_certificate_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tls.handshake.certificate']) if frame_signature_list[i]['tls.handshake.certificate'] != '': ret_certificate_list.extend(frame_signature_list[i]['tls.handshake.certificate'].split(',')) break for i in range(len(ret_certificate_list)): # 解析证书 bytes.fromhex(hex_string) cert = x509.load_der_x509_certificate(bytes.fromhex(ret_certificate_list[i]), default_backend()) issuer = cert.issuer common_name = None for attribute in issuer: if attribute.oid == x509.NameOID.COUNTRY_NAME: common_name = attribute.value ret_list.append(common_name) return ret_list # ssl_subject_common_name def ssl_subject_common_name(self, frame_signature_list): ret_list = [] ret_certificate_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tls.handshake.certificate']) if frame_signature_list[i]['tls.handshake.certificate'] != '': ret_certificate_list.extend(frame_signature_list[i]['tls.handshake.certificate'].split(',')) break for i in range(len(ret_certificate_list)): # 解析证书 bytes.fromhex(hex_string) cert = x509.load_der_x509_certificate(bytes.fromhex(ret_certificate_list[i]), default_backend()) subject = cert.subject common_name = None for attribute in subject: if attribute.oid == x509.NameOID.COMMON_NAME: common_name = attribute.value ret_list.append(common_name) return ret_list # ssl_subject_organization_name def ssl_subject_organization_name(self, frame_signature_list): ret_list = [] ret_certificate_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tls.handshake.certificate']) if frame_signature_list[i]['tls.handshake.certificate'] != '': ret_certificate_list.extend(frame_signature_list[i]['tls.handshake.certificate'].split(',')) for i in range(len(ret_certificate_list)): # 解析证书 bytes.fromhex(hex_string) cert = x509.load_der_x509_certificate(bytes.fromhex(ret_certificate_list[i]), default_backend()) subject = cert.subject common_name = None for attribute in subject: if attribute.oid == x509.NameOID.ORGANIZATION_NAME: common_name = attribute.value ret_list.append(common_name) return ret_list # ssl_subject_country_name def ssl_subject_country_name(self, frame_signature_list): ret_list = [] ret_certificate_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tls.handshake.certificate']) if frame_signature_list[i]['tls.handshake.certificate'] != '': ret_certificate_list.extend(frame_signature_list[i]['tls.handshake.certificate'].split(',')) for i in range(len(ret_certificate_list)): # 解析证书 bytes.fromhex(hex_string) cert = x509.load_der_x509_certificate(bytes.fromhex(ret_certificate_list[i]), default_backend()) subject = cert.subject common_name = None for attribute in subject: if attribute.oid == x509.NameOID.COUNTRY_NAME: common_name = attribute.value ret_list.append(common_name) return ret_list # ssl_not_valid_before def ssl_not_valid_before(self, frame_signature_list): ret_list = [] ret_certificate_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tls.handshake.certificate']) if frame_signature_list[i]['tls.handshake.certificate'] != '': ret_certificate_list.extend(frame_signature_list[i]['tls.handshake.certificate'].split(',')) for i in range(len(ret_certificate_list)): # 解析证书 bytes.fromhex(hex_string) cert = x509.load_der_x509_certificate(bytes.fromhex(ret_certificate_list[i]), default_backend()) # 获取证书的生效日期 not_valid_before = cert.not_valid_before given_datetime = not_valid_before.strftime("%Y-%m-%d %H:%M:%S") ret_list.append(given_datetime) return ret_list # ssl_not_valid_after def ssl_not_valid_after(self, frame_signature_list): ret_list = [] ret_certificate_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tls.handshake.certificate']) if frame_signature_list[i]['tls.handshake.certificate'] != '': ret_certificate_list.extend(frame_signature_list[i]['tls.handshake.certificate'].split(',')) for i in range(len(ret_certificate_list)): # 解析证书 bytes.fromhex(hex_string) cert = x509.load_der_x509_certificate(bytes.fromhex(ret_certificate_list[i]), default_backend()) # 获取证书的生效日期 not_valid_after = cert.not_valid_after given_datetime = not_valid_after.strftime("%Y-%m-%d %H:%M:%S") ret_list.append(given_datetime) return ret_list # ssl_algorithm_id x509af.algorithm.id def ssl_algorithm_id(self, frame_signature_list): ret_list = [] ret_list_temp = [] for i in range(len(frame_signature_list)): if frame_signature_list[i]['x509af.algorithm.id'] != '': ret_list_temp.extend(frame_signature_list[i]['x509af.algorithm.id'].split(',')) for i in range(len(ret_list_temp)): if ret_list_temp[i] not in ret_list: ret_list.append(ret_list_temp[i]) return ret_list def ssl_algorithm_identifier(self, frame_signature_list): ret_list = [] ret_list_temp = [] for i in range(len(frame_signature_list)): if frame_signature_list[i]['x509af.algorithm.id'] != '': ret_list_temp.extend(frame_signature_list[i]['x509af.algorithm.id'].split(',')) for i in range(len(ret_list_temp)): algorithm_identifier_str = '' try: algorithm_identifier_str = algorithm_identifier_dict[ret_list_temp[i]] except: algorithm_identifier_str = 'unknown' if algorithm_identifier_str not in ret_list: ret_list.append(algorithm_identifier_str) return ret_list # ssl_san 'x509ce.dNSName' def ssl_san(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['x509ce.dNSName']) if frame_signature_list[i]['x509ce.dNSName'] != '' and frame_signature_list[i]['x509ce.dNSName'] not in ret_list: ret_list.append(frame_signature_list[i]['x509ce.dNSName']) return ret_list # heartbeat_flag def heartbeat_flag(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): if frame_signature_list[i]['tls.record.content_type'] == '24': ret_list.append(frame_signature_list[i]['frame.number']) return ret_list # tls.esni.encrypted_sni def ssl_analysis_esni_enabled (self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): if frame_signature_list[i]['tls.esni.encrypted_sni'] != '' and frame_signature_list[i]['tls.esni.encrypted_sni'] not in ret_list: ret_list.append(frame_signature_list[i]['tls.esni.encrypted_sni']) if len(ret_list) == 0: return ['False'] else: return ['Ture'] ####################################### udp ####################################### # udp_c2s_first_data def udp_c2s_first_data(self, frame_signature_list): ret_list = self.GetUdpClinetAndServerIP(frame_signature_list) client_ip = ret_list[0] server_ip = ret_list[2] for i in range(len(frame_signature_list)): if frame_signature_list[i]['ip.src'] == client_ip: return [frame_signature_list[i]['udp.payload']] return [] # udp_s2c_first_data def udp_s2c_first_data(self, frame_signature_list): ret_list = self.GetUdpClinetAndServerIP(frame_signature_list) client_ip = ret_list[0] server_ip = ret_list[2] for i in range(len(frame_signature_list)): if frame_signature_list[i]['ip.src'] == server_ip: return [frame_signature_list[i]['udp.payload']] return [] # udp_c2s_first_data_len def udp_c2s_first_data_len(self, frame_signature_list): if len(self.udp_c2s_first_data(frame_signature_list)) > 0 : return [str(len(self.udp_c2s_first_data(frame_signature_list)[0]))] else: return ["0"] # udp_s2c_first_data_len def udp_s2c_first_data_len(self, frame_signature_list): if len(self.udp_s2c_first_data(frame_signature_list)) > 0 : return [str(len(self.udp_s2c_first_data(frame_signature_list)[0]))] else: return ["0"] # udp_get_payload udp.payload def udp_get_payload(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['datcp.payloadta']) if frame_signature_list[i]['udp.payload'] != '': ret_list.append(frame_signature_list[i]['udp.payload']) return ret_list # udp_srcport def udp_srcport(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['udp.srcport']) if frame_signature_list[i]['udp.srcport'] not in ret_list: ret_list.append(frame_signature_list[i]['udp.srcport']) return ret_list # udp_dstport udp.dstport def udp_dstport(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['udp.dstport']) if frame_signature_list[i]['udp.dstport'] not in ret_list: ret_list.append(frame_signature_list[i]['udp.dstport']) return ret_list ####################################### tcp ####################################### # tcp_c2s_first_data def tcp_c2s_first_data(self, frame_signature_list): if len(frame_signature_list) > 3: if "create_with_syn" in self.tcp_create_with_syn(frame_signature_list) : return [frame_signature_list[3]['tcp.payload']] else: return [''] else: return [''] # tcp_s2c_first_data def tcp_s2c_first_data(self, frame_signature_list): if len(frame_signature_list) > 4: if "create_with_syn" in self.tcp_create_with_syn(frame_signature_list): return [frame_signature_list[4]['tcp.payload']] else: return [''] else: return [''] # tcp_c2s_first_data_len def tcp_c2s_first_data_len(self, frame_signature_list): return [str(len(self.tcp_c2s_first_data(frame_signature_list)[0]))] # tcp_s2c_first_data_len def tcp_s2c_first_data_len(self, frame_signature_list): return [str(len(self.tcp_s2c_first_data(frame_signature_list)[0]))] # tcp_create_with_syn def tcp_create_with_syn(self, frame_signature_list): if len(frame_signature_list) >= 3: if frame_signature_list[0]['tcp.flags.syn'] == '1' and frame_signature_list[0]['tcp.flags.ack'] == '0' and \ frame_signature_list[1]['tcp.flags.syn'] == '1' and frame_signature_list[1]['tcp.flags.ack'] == '1' and \ frame_signature_list[2]['tcp.flags.syn'] == '0' and frame_signature_list[2]['tcp.flags.ack'] == '1' : return ["create_with_syn"] else: return ["NO create_with_syn"] else: return ["TCP NO Create"] # tcp_get_payload tcp.payload def tcp_get_payload(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): if frame_signature_list[i]['tcp.payload'] != '': ret_list.append(frame_signature_list[i]['tcp.payload']) ret_str = self.GetValidCharactersFromPayload(frame_signature_list[i]['tcp.payload']) return ret_list # tcp_syn_fingerprint tcp.flags.syn def tcp_syn_fingerprint(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): if frame_signature_list[i]['tcp.flags.syn'] != '1' and frame_signature_list[i]['tcp.payload'] != '': ret_list.append(frame_signature_list[i]['tcp.payload']) break return ret_list # tcp_sack_fingerprint tcp.flags.ack def tcp_sack_fingerprint(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['datcp.payloadta']) if frame_signature_list[i]['tcp.flags.ack'] != '1' and frame_signature_list[i]['tcp.payload'] != '': ret_list.append(frame_signature_list[i]['tcp.payload']) break return ret_list # tcp_srcport tcp.srcport def tcp_srcport(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tcp.srcport']) if frame_signature_list[i]['tcp.srcport'] not in ret_list: ret_list.append(frame_signature_list[i]['tcp.srcport']) return ret_list # tcp_dstport tcp.dstport def tcp_dstport(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tcp.dstport']) if frame_signature_list[i]['tcp.dstport'] not in ret_list: ret_list.append(frame_signature_list[i]['tcp.dstport']) return ret_list # tcp_window tcp.window_size_value def tcp_window(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['tcp.window_size_value']) ret_list.append(frame_signature_list[i]['tcp.window_size_value']) return [ret_list[-1]] ####################################### ip ####################################### # ip_payload data def ip_payload(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['data']) if frame_signature_list[i]['data'] != '': ret_list.append(frame_signature_list[i]['data']) return ret_list # ip_src def ip_src(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['ip.src']) if frame_signature_list[i]['ip.src'] not in ret_list: ret_list.append(frame_signature_list[i]['ip.src']) return ret_list # ip_dst ip.dst def ip_dst(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['ip.dst']) if frame_signature_list[i]['ip.dst'] not in ret_list: ret_list.append(frame_signature_list[i]['ip.dst']) return ret_list # ip_proto ip.proto def ip_proto(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['ip.dst']) if frame_signature_list[i]['ip.proto'] not in ret_list: if frame_signature_list[i]['ip.proto'] == '6': ret_list.append("TCP") break elif frame_signature_list[i]['ip.proto'] == '17': ret_list.append("UDP") break else: print(frame_signature_list[i]['ip.proto']) return ret_list ####################################### dns ####################################### # dns_qry_name dns.qry.name def dns_qry_name(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): # print(frame_signature_list[i]['dns.qry.name']) if frame_signature_list[i]['dns.qry.name'] not in ret_list: ret_list.append(frame_signature_list[i]['dns.qry.name']) return ret_list ####################################### http ####################################### # http_request_full_uri http.request.full_uri def http_request_full_uri(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): if frame_signature_list[i]['http.request.full_uri'] not in ret_list and frame_signature_list[i]['http.request.full_uri'] != '': ret_list.append(frame_signature_list[i]['http.request.full_uri']) return ret_list # http_request_header http.request def http_request_header(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): if frame_signature_list[i]['http.request'] == 'True': tcp_payload = frame_signature_list[i]['tcp.payload'] http_file_data = frame_signature_list[i]['http.file_data'] modified_string = tcp_payload.replace(http_file_data, "") headers = HexToHttpHeaders(modified_string) ret_list.append(headers) return ret_list # http_response_header http.response def http_response_header(self, frame_signature_list): ret_list = [] for i in range(len(frame_signature_list)): if frame_signature_list[i]['http.response'] == 'True': tcp_payload = frame_signature_list[i]['tcp.payload'] http_file_data = frame_signature_list[i]['http.file_data'] modified_string = tcp_payload.replace(http_file_data, "") headers = HexToHttpHeaders(modified_string) ret_list.append(headers) return ret_list def MonthToNumber(abbreviation): months = { 'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12 } return months.get(abbreviation.capitalize()) def RemoveAfterLastSpace(input_string): last_space_index = input_string.rfind(' ') if last_space_index != -1: result_string = input_string[:last_space_index] return result_string else: return input_string # Get data flow time def GetDataFlowTime(feature_list, ): # Get the start time of the data stream frame_time_first= feature_list[0]['frame.time'] relative_start_dt_list = RemoveAfterLastSpace(frame_time_first).split() year = relative_start_dt_list[2] month = str(MonthToNumber(relative_start_dt_list[0])) day = relative_start_dt_list[1].split(',')[0] time_str = relative_start_dt_list[3][:-3] relative_start_dt = year + '-' + month + '-' + day + ' ' + time_str # Get the end time of the data stream full_time_last = feature_list[-1]['frame.time'] relative_end_dt_list = RemoveAfterLastSpace(full_time_last).split() year = relative_end_dt_list[2] month = str(MonthToNumber(relative_end_dt_list[0])) day = relative_end_dt_list[1].split(',')[0] time_str = relative_end_dt_list[3][:-3] relative_end_dt = year + '-' + month + '-' + day + ' ' + time_str # Get the duration of a data stream datetime_obj1 = datetime.strptime(relative_start_dt, "%Y-%m-%d %H:%M:%S.%f") datetime_obj2 = datetime.strptime(relative_end_dt, "%Y-%m-%d %H:%M:%S.%f") time_difference = datetime_obj2 - datetime_obj1 duration_dt = time_difference.total_seconds() return relative_start_dt, relative_end_dt, duration_dt # Get the basic information dictionary of the data stream def GetBasicInfoDict(stream_type, stream_ID, start_tag, end_tag, client_iP, client_port, server_iP, server_port, stream_irection, s2c_frames_num, s2c_byte, c2s_frames_num, c2s_byte, total_frames_num, total_byte, relative_start_dt, relative_end_dt, duration_dt): basic_info_dict = {} basic_info_dict['StreamType'] = stream_type basic_info_dict['StreamID'] = stream_ID basic_info_dict['StartTag'] = start_tag basic_info_dict['EndTag'] = end_tag basic_info_dict['ClientIP'] = client_iP basic_info_dict['ClientPort'] = client_port basic_info_dict['ServerIP'] = server_iP basic_info_dict['ServerPort'] = server_port basic_info_dict['StreamIrection'] = stream_irection basic_info_dict['S2C Num'] = s2c_frames_num basic_info_dict['S2C Byte'] = s2c_byte basic_info_dict['C2S Num'] = c2s_frames_num basic_info_dict['C2S Byte'] = c2s_byte basic_info_dict['TotalNum'] = total_frames_num basic_info_dict['TotalByte'] = total_byte basic_info_dict['Start'] = relative_start_dt basic_info_dict['End'] = relative_end_dt basic_info_dict['Duration'] = duration_dt return basic_info_dict # Define basic information about data flow # get TCP Basic information def GetTCPStreamBaseInfo(all_frame_signature_dict_list): # Extract TCP stream ID list tcp_stream_basic_info_list = [] tcp_stream_ID_list = [] for i in range(len(all_frame_signature_dict_list)): if all_frame_signature_dict_list[i]['tcp.stream'] != '': if all_frame_signature_dict_list[i]['tcp.stream'] not in tcp_stream_ID_list: tcp_stream_ID_list.append(all_frame_signature_dict_list[i]['tcp.stream']) # Extract basic information of TCP data stream for i in range(len(tcp_stream_ID_list)): tcp_feature_list = [] for j in range(len(all_frame_signature_dict_list)): if all_frame_signature_dict_list[j]['tcp.stream'] == tcp_stream_ID_list[i]: tcp_feature_list.append(all_frame_signature_dict_list[j]) stream_type = "TCP" stream_ID = tcp_stream_ID_list[i] # tcp.flags.syn==1 && tcp.flags.ack==0 # tcp.flags.syn==1 && tcp.flags.ack==1 # tcp.flags.syn==0 && tcp.flags.ack==1 if len(tcp_feature_list) >= 3: if tcp_feature_list[0]['tcp.flags.syn'] == '1' and tcp_feature_list[0]['tcp.flags.ack'] == '0' and \ tcp_feature_list[1]['tcp.flags.syn'] == '1' and tcp_feature_list[1]['tcp.flags.ack'] == '1' and \ tcp_feature_list[2]['tcp.flags.syn'] == '0' and tcp_feature_list[2]['tcp.flags.ack'] == '1' : start_tag = 'Normal Handshake' else: start_tag = 'No Handshake' else: start_tag = 'frame_num < 3' # tcp.flags.fin==0 && tcp.flags.ack==1 # tcp.flags.fin==1 && tcp.flags.ack==1 # tcp.flags.fin==0 && tcp.flags.ack==1 # tcp.flags.fin==1 && tcp.flags.ack==0 # Four times wave conditions if len(tcp_feature_list) >= 4: if tcp_feature_list[-1]['tcp.flags.reset'] == '1' : end_tag = 'By Reset' elif tcp_feature_list[-1]['tcp.flags.fin'] == '0' and tcp_feature_list[-1]['tcp.flags.ack'] == '1' and \ tcp_feature_list[-2]['tcp.flags.fin'] == '1' and tcp_feature_list[-2]['tcp.flags.ack'] == '1' and \ tcp_feature_list[-3]['tcp.flags.fin'] == '0' and tcp_feature_list[-3]['tcp.flags.ack'] == '1' and \ tcp_feature_list[-4]['tcp.flags.fin'] == '1' and tcp_feature_list[-4]['tcp.flags.ack'] == '0' : end_tag = 'Normal Wavehand' else: end_tag = 'No Wavehand' else: end_tag = 'frame_num < 4' # Determine the client IP and server IP if start_tag == 'Normal Handshake': client_iP = tcp_feature_list[0]['ip.src'] client_port = tcp_feature_list[0]['tcp.srcport'] server_iP = tcp_feature_list[0]['ip.dst'] server_port = tcp_feature_list[0]['tcp.dstport'] else: if int(tcp_feature_list[0]['tcp.srcport']) > 5000: client_iP = tcp_feature_list[0]['ip.src'] client_port = tcp_feature_list[0]['tcp.srcport'] server_iP = tcp_feature_list[0]['ip.dst'] server_port = tcp_feature_list[0]['tcp.dstport'] else: client_iP = tcp_feature_list[0]['ip.dst'] client_port = tcp_feature_list[0]['tcp.dstport'] server_iP = tcp_feature_list[0]['ip.src'] server_port = tcp_feature_list[0]['tcp.srcport'] # Get the data flow direction stream_irection ip_list = [] for j in range(len(tcp_feature_list)): if tcp_feature_list[j]['ip.src'] in ip_list: pass else: ip_list.append(tcp_feature_list[j]['ip.src']) if len(ip_list) > 1: stream_irection = 'double' elif ip_list[0] == '127.0.0.1': stream_irection = 'local' else: if ip_list[0] == client_iP: stream_irection = 'c2s' else: stream_irection = 's2c' # Get data flow in each direction: s2c_frames_num, s2c_byte, c2s_frames_num, c2s_byte, total_frames_num, total_byte s2c_frames_num = 0 s2c_byte = 0 c2s_frames_num = 0 c2s_byte = 0 total_frames_num = len(tcp_feature_list) total_byte = 0 for j in range(len(tcp_feature_list)): if tcp_feature_list[j]['tcp.len'] == '': tcp_len = len(tcp_feature_list[j]['tcp.payload']) else: tcp_len = int(tcp_feature_list[j]['tcp.len']) total_byte += tcp_len if tcp_feature_list[j]['ip.src'] == client_iP: c2s_frames_num += 1 c2s_byte += tcp_len else: s2c_frames_num += 1 s2c_byte += tcp_len # Get the start time and duration of a data stream: relative_start_dt, duration_dt ret_get_data_flow_time = GetDataFlowTime(tcp_feature_list) relative_start_dt = ret_get_data_flow_time[0] relative_end_dt = ret_get_data_flow_time[1] duration_dt = ret_get_data_flow_time[2] tcp_stream_base_dict = GetBasicInfoDict(stream_type, stream_ID, start_tag, end_tag, client_iP, client_port, server_iP, server_port, stream_irection, s2c_frames_num, s2c_byte, c2s_frames_num, c2s_byte, total_frames_num, total_byte, relative_start_dt, relative_end_dt, duration_dt) tcp_stream_basic_info_list.append(tcp_stream_base_dict) return tcp_stream_basic_info_list # UDP Obtaining basic information about data streams def GetUDPStreamBaseInfo(all_frame_signature_dict_list): # Extract UDP stream ID list tcp_stream_basic_info_list = [] udp_stream_ID_list = [] for i in range(len(all_frame_signature_dict_list)): if all_frame_signature_dict_list[i]['udp.stream'] != '': if all_frame_signature_dict_list[i]['udp.stream'] not in udp_stream_ID_list: udp_stream_ID_list.append(all_frame_signature_dict_list[i]['udp.stream']) # Extract basic information of udp data stream for i in range(len(udp_stream_ID_list)): udp_feature_list = [] for j in range(len(all_frame_signature_dict_list)): if all_frame_signature_dict_list[j]['udp.stream'] == udp_stream_ID_list[i]: udp_feature_list.append(all_frame_signature_dict_list[j]) stream_type = "UDP" stream_ID = udp_stream_ID_list[i] start_tag = 'Normal' end_tag = 'Normal' # Determine the client IP and server IP. The one with the larger port in UDP is defined as the client. if int(udp_feature_list[0]['udp.srcport']) > int(udp_feature_list[0]['udp.dstport']): client_iP = udp_feature_list[0]['ip.src'] client_port = udp_feature_list[0]['udp.srcport'] server_iP = udp_feature_list[0]['ip.dst'] server_port = udp_feature_list[0]['udp.dstport'] else: client_iP = udp_feature_list[0]['ip.dst'] client_port = udp_feature_list[0]['udp.dstport'] server_iP = udp_feature_list[0]['ip.src'] server_port = udp_feature_list[0]['udp.srcport'] # Determine the direction of data flow ip_list = [] for j in range(len(udp_feature_list)): if udp_feature_list[j]['ip.src'] in ip_list: pass else: ip_list.append(udp_feature_list[j]['ip.src']) if len(ip_list) > 1: stream_irection = 'double' elif ip_list[0] == '127.0.0.1': stream_irection = 'local' else: if ip_list[0] == client_iP: stream_irection = 'c2s' else: stream_irection = 's2c' # Get data flow in each direction: s2c_frames_num, s2c_byte, c2s_frames_num, c2s_byte, total_frames_num, total_byte s2c_frames_num = 0 s2c_byte = 0 c2s_frames_num = 0 c2s_byte = 0 total_frames_num = len(udp_feature_list) total_byte = 0 for j in range(len(udp_feature_list)): total_byte += int(udp_feature_list[j]['udp.length']) if udp_feature_list[j]['ip.src'] == client_iP: c2s_frames_num += 1 c2s_byte += int(udp_feature_list[j]['udp.length']) else: s2c_frames_num += 1 s2c_byte += int(udp_feature_list[j]['udp.length']) # Get the start time and duration of a data stream relative_start_dt, duration_dt ret_get_data_flow_time = GetDataFlowTime(udp_feature_list) relative_start_dt = ret_get_data_flow_time[0] relative_end_dt = ret_get_data_flow_time[1] duration_dt = ret_get_data_flow_time[2] udp_stream_base_dict = GetBasicInfoDict(stream_type, stream_ID, start_tag, end_tag, client_iP, client_port, server_iP, server_port, stream_irection, s2c_frames_num, s2c_byte, c2s_frames_num, c2s_byte, total_frames_num, total_byte, relative_start_dt, relative_end_dt, duration_dt) tcp_stream_basic_info_list.append(udp_stream_base_dict) return tcp_stream_basic_info_list