#!/usr/bin/evn python # -*- coding:utf-8 -*- import binascii import sys import os import subprocess import json import struct class SectionHeaderBlock_a: # SHB def __init__(self, data): self._Block_Type = data[0:4] self._Block_Length = data[4:8] self._Byte_Order_Magic = data[8:12] self._Major_Version = data[12:14] self._Minor_Version = data[14:16] self._Section_Length = data[16:24] self._decimal_number = int.from_bytes(self._Block_Length, byteorder='little') def display_content(self): print(binascii.b2a_hex(self._Block_Type)) print(binascii.b2a_hex(self._Block_Length)) print(binascii.b2a_hex(self._Byte_Order_Magic)) print(binascii.b2a_hex(self._Major_Version)) print(binascii.b2a_hex(self._Minor_Version)) print(binascii.b2a_hex(self._Section_Length)) print(self._decimal_number) class SectionHeaderBlock_b: # SHB def __init__(self, data, Block_Length): self._Options = data[0:Block_Length - 24 - 4 ] self._Block_Length_redundant = data[Block_Length - 24 - 4 : Block_Length - 24] def display_content(self): print(binascii.b2a_hex(self._Options)) print(binascii.b2a_hex(self._Block_Length_redundant)) class InterfaceDescriptionBlock: # IDB def __init__(self, data): self._Block_Type = data[0:4] self._Block_Length = data[4:8] self._decimal_number = int.from_bytes(self._Block_Length, byteorder='little') def display_content(self): print(binascii.b2a_hex(self._Block_Type)) print(binascii.b2a_hex(self._Block_Length)) print(self._decimal_number) class PacketBlocks_a: # PB def __init__(self, data): self._Block_Type = data[0:4] self._Block_Length = data[4:8] self._Block_Length_int = int.from_bytes(self._Block_Length, byteorder='little') self._Interface_ID = data[8:12] self._Timestamp_Upper = data[12:16] self._Timestamp_Lower = data[16:20] self._Captured_Packet_Length = data[20:24] self._Captured_Packet_Length_int = int.from_bytes(self._Captured_Packet_Length, byteorder='little') self._Original_Packet_Length = data[24:28] self._Original_Packet_Length_int = int.from_bytes(self._Original_Packet_Length, byteorder='little') def display_content(self): print('Block_Type', binascii.b2a_hex(self._Block_Type)) print('Block_Length', binascii.b2a_hex(self._Block_Length)) print('Block_Length_int', self._Block_Length_int) print('Interface_ID', binascii.b2a_hex(self._Interface_ID)) print('Timestamp_Upper', binascii.b2a_hex(self._Timestamp_Upper)) print('Timestamp_Lower', binascii.b2a_hex(self._Timestamp_Lower)) print('Captured_Packet_Length', binascii.b2a_hex(self._Captured_Packet_Length)) print('Captured_Packet_Length_int', self._Captured_Packet_Length_int) print('Original_Packet_Length', binascii.b2a_hex(self._Original_Packet_Length)) print('Original_Packet_Length_int', self._Original_Packet_Length_int) class PacketBlocks_b: # PB def __init__(self, data, Block_Length, Packet_Length): self._Packet_Data= data[0:Packet_Length] self._Options = data[Packet_Length:Block_Length - 32] self._Block_Total_Length_redundant = data[Block_Length - 32: Block_Length - 28] def display_content(self): print('Packet_Data', binascii.b2a_hex(self._Packet_Data)) print('Options', binascii.b2a_hex(self._Options)) print('Block_Total_Length_redundant', binascii.b2a_hex(self._Block_Total_Length_redundant)) class PB_Options: def __init__(self, data): self._Option_Code= data[0:2] self._Option_Length = data[2:4] self._Option_Length_int = int.from_bytes(self._Option_Length, byteorder='little') self._Option_Value = data[4:self._Option_Length_int + 4] self._End_of_Options = data[self._Option_Length_int + 4 : ] def display_content(self): print(binascii.b2a_hex(self._Option_Code)) print(binascii.b2a_hex(self._Option_Length)) print(self._Option_Length_int) print(binascii.b2a_hex(self._Option_Value)) print(binascii.b2a_hex(self._End_of_Options)) def ExtractPacketsCommentStr(input_file_path, appsketch_api, pcap_file_id): # Use tshark to extract the five_tuple info tshark_cmd = ['tshark', '-r', input_file_path, '-T', 'fields', '-e', 'frame.number', '-e', 'tcp.stream', '-e', 'udp.stream'] ret_str = subprocess.run(tshark_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) decoded_result = ret_str.stdout list_1 = decoded_result.splitlines() list_output = [] for i in range(0, len(list_1)) : list_2 = list_1[i].split("\t") list_output.append(list_2) comment_str_list = [] for i in range(len(list_output)): # frame_id = list_output[i][0] session_dict = {} if list_output[i][1] != '': proto = 'tcp' stream_id = list_output[i][1] elif list_output[i][1] == '' and list_output[i][2] != '': proto = 'udp' stream_id = list_output[i][2] else: proto = 'other' stream_id = '' session_dict['session'] = appsketch_api + '/session/' + pcap_file_id + '/' + proto + '/' + stream_id comment_str_list.append(session_dict) return comment_str_list def TransferPcapNG(input_file_path): with open(input_file_path, 'rb') as f: first_four_bytes = f.read(4) if first_four_bytes == bytes.fromhex('0a0d0d0a') or first_four_bytes == bytes.fromhex('0A0D0D0A'): return input_file_path else: # Get the file name (including extension) file_name_with_extension = os.path.basename(input_file_path) # Remove the extension transfer_file_path = os.path.splitext(file_name_with_extension)[0] + '.pcapng' # Get the directory path without the file name directory_path = os.path.dirname(input_file_path) tshark_cmd = ['tshark', '-r', input_file_path, '-w', directory_path + '/' + transfer_file_path] subprocess.run(tshark_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) input_file_path = './output/Transfer.pcapng' return directory_path + '/' + transfer_file_path if __name__=="__main__": input_file_path = TransferPcapNG(sys.argv[1]) output_file_path = sys.argv[2] appsketch_api = sys.argv[3] pcap_file_id = sys.argv[4] comment_str_list = ExtractPacketsCommentStr(input_file_path, appsketch_api, pcap_file_id) # for i, pkt in enumerate(comment_str_list): # print(comment_str_list[i]) binfile_r = open(input_file_path, 'rb') binfile_w = open(input_file_path, 'rb') binfile_output = open(output_file_path, 'wb') size = os.path.getsize(input_file_path) # Read Pcapng SHB section_header_block_a = SectionHeaderBlock_a(binfile_r.read(24)) # section_header_block_a.display_content() SHB_length = section_header_block_a._decimal_number # Remaining bytes read binfile_r.read(SHB_length - 24) # Read Pcapng IDB interface_description_block = InterfaceDescriptionBlock(binfile_r.read(8)) # interface_description_block.display_content() IDB_length = interface_description_block._decimal_number # Remaining bytes read binfile_r.read(IDB_length - 8 ) # 4Byte alignment # Get the number of bytes currently read current_position = binfile_r.tell() # Calculate the next 4-byte aligned position aligned_position = (current_position + 3) & ~3 align_null_bytes_SHB = bytes.fromhex('') for i in range(aligned_position - current_position): align_null_bytes_SHB += bytes.fromhex('00') write_data = binfile_w.read(current_position) binfile_output.write(write_data) binfile_output.write(align_null_bytes_SHB) binfile_w.close() # Packets Traversal Add Option for i in range(len(comment_str_list)): # Read Pcapng Packet Blocks packet_block_a = PacketBlocks_a(binfile_r.read(28)) # packet_block_a.display_content() packet_block_data = binfile_r.read(packet_block_a._Captured_Packet_Length_int) # print('Packet_Data', binascii.b2a_hex(packet_block_data)) # 4Byte alignment # Get the number of bytes currently read current_position = binfile_r.tell() # Calculate the next 4-byte aligned position aligned_position = (current_position + 3) & ~3 align_null_bytes_PB_a = bytes.fromhex('') for j in range(aligned_position - current_position): align_null_bytes_PB_a += bytes.fromhex('00') # 使用 `json.dumps` 方法将 JSON 对象格式化为带缩进的字符串 remark_str = json.dumps(comment_str_list[i], indent=4) for j in range(4 - len(remark_str.encode('utf-8')) % 4): binary_str = '00' char = chr(int(binary_str, 2)) remark_str += char Block_Type = packet_block_a._Block_Type # Modify Block_Length # Block_Length = packet_block_a._Block_Length Block_Length = struct.pack('