274 lines
12 KiB
Python
274 lines
12 KiB
Python
#!/usr/bin/evn python
|
|
# -*- coding:utf-8 -*-
|
|
|
|
import binascii
|
|
import sys
|
|
import os
|
|
import subprocess
|
|
import json
|
|
import struct
|
|
|
|
class SectionHeaderBlock_a: # SHB
|
|
|
|
def __init__(self, data):
|
|
|
|
self._Block_Type = data[0:4]
|
|
self._Block_Length = data[4:8]
|
|
self._Byte_Order_Magic = data[8:12]
|
|
self._Major_Version = data[12:14]
|
|
self._Minor_Version = data[14:16]
|
|
self._Section_Length = data[16:24]
|
|
self._decimal_number = int.from_bytes(self._Block_Length, byteorder='little')
|
|
|
|
def display_content(self):
|
|
print(binascii.b2a_hex(self._Block_Type))
|
|
print(binascii.b2a_hex(self._Block_Length))
|
|
print(binascii.b2a_hex(self._Byte_Order_Magic))
|
|
print(binascii.b2a_hex(self._Major_Version))
|
|
print(binascii.b2a_hex(self._Minor_Version))
|
|
print(binascii.b2a_hex(self._Section_Length))
|
|
print(self._decimal_number)
|
|
|
|
class SectionHeaderBlock_b: # SHB
|
|
def __init__(self, data, Block_Length):
|
|
self._Options = data[0:Block_Length - 24 - 4 ]
|
|
self._Block_Length_redundant = data[Block_Length - 24 - 4 : Block_Length - 24]
|
|
|
|
def display_content(self):
|
|
print(binascii.b2a_hex(self._Options))
|
|
print(binascii.b2a_hex(self._Block_Length_redundant))
|
|
|
|
class InterfaceDescriptionBlock: # IDB
|
|
def __init__(self, data):
|
|
self._Block_Type = data[0:4]
|
|
self._Block_Length = data[4:8]
|
|
self._decimal_number = int.from_bytes(self._Block_Length, byteorder='little')
|
|
|
|
def display_content(self):
|
|
print(binascii.b2a_hex(self._Block_Type))
|
|
print(binascii.b2a_hex(self._Block_Length))
|
|
print(self._decimal_number)
|
|
|
|
class PacketBlocks_a: # PB
|
|
def __init__(self, data):
|
|
self._Block_Type = data[0:4]
|
|
self._Block_Length = data[4:8]
|
|
self._Block_Length_int = int.from_bytes(self._Block_Length, byteorder='little')
|
|
self._Interface_ID = data[8:12]
|
|
self._Timestamp_Upper = data[12:16]
|
|
self._Timestamp_Lower = data[16:20]
|
|
self._Captured_Packet_Length = data[20:24]
|
|
self._Captured_Packet_Length_int = int.from_bytes(self._Captured_Packet_Length, byteorder='little')
|
|
self._Original_Packet_Length = data[24:28]
|
|
self._Original_Packet_Length_int = int.from_bytes(self._Original_Packet_Length, byteorder='little')
|
|
|
|
def display_content(self):
|
|
print('Block_Type', binascii.b2a_hex(self._Block_Type))
|
|
print('Block_Length', binascii.b2a_hex(self._Block_Length))
|
|
print('Block_Length_int', self._Block_Length_int)
|
|
print('Interface_ID', binascii.b2a_hex(self._Interface_ID))
|
|
print('Timestamp_Upper', binascii.b2a_hex(self._Timestamp_Upper))
|
|
print('Timestamp_Lower', binascii.b2a_hex(self._Timestamp_Lower))
|
|
print('Captured_Packet_Length', binascii.b2a_hex(self._Captured_Packet_Length))
|
|
print('Captured_Packet_Length_int', self._Captured_Packet_Length_int)
|
|
print('Original_Packet_Length', binascii.b2a_hex(self._Original_Packet_Length))
|
|
print('Original_Packet_Length_int', self._Original_Packet_Length_int)
|
|
|
|
class PacketBlocks_b: # PB
|
|
def __init__(self, data, Block_Length, Packet_Length):
|
|
self._Packet_Data= data[0:Packet_Length]
|
|
self._Options = data[Packet_Length:Block_Length - 32]
|
|
self._Block_Total_Length_redundant = data[Block_Length - 32: Block_Length - 28]
|
|
|
|
def display_content(self):
|
|
print('Packet_Data', binascii.b2a_hex(self._Packet_Data))
|
|
print('Options', binascii.b2a_hex(self._Options))
|
|
print('Block_Total_Length_redundant', binascii.b2a_hex(self._Block_Total_Length_redundant))
|
|
|
|
class PB_Options:
|
|
def __init__(self, data):
|
|
self._Option_Code= data[0:2]
|
|
self._Option_Length = data[2:4]
|
|
self._Option_Length_int = int.from_bytes(self._Option_Length, byteorder='little')
|
|
self._Option_Value = data[4:self._Option_Length_int + 4]
|
|
self._End_of_Options = data[self._Option_Length_int + 4 : ]
|
|
|
|
def display_content(self):
|
|
print(binascii.b2a_hex(self._Option_Code))
|
|
print(binascii.b2a_hex(self._Option_Length))
|
|
print(self._Option_Length_int)
|
|
print(binascii.b2a_hex(self._Option_Value))
|
|
print(binascii.b2a_hex(self._End_of_Options))
|
|
|
|
def ExtractPacketsCommentStr(input_file_path, appsketch_api, pcap_file_id):
|
|
# Use tshark to extract the five_tuple info
|
|
tshark_cmd = ['tshark', '-r', input_file_path, '-T', 'fields', '-e', 'frame.number', '-e', 'tcp.stream', '-e', 'udp.stream']
|
|
ret_str = subprocess.run(tshark_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
decoded_result = ret_str.stdout
|
|
list_1 = decoded_result.splitlines()
|
|
list_output = []
|
|
for i in range(0, len(list_1)) :
|
|
list_2 = list_1[i].split("\t")
|
|
list_output.append(list_2)
|
|
|
|
comment_str_list = []
|
|
for i in range(len(list_output)):
|
|
# frame_id = list_output[i][0]
|
|
session_dict = {}
|
|
if list_output[i][1] != '':
|
|
proto = 'tcp'
|
|
stream_id = list_output[i][1]
|
|
elif list_output[i][1] == '' and list_output[i][2] != '':
|
|
proto = 'udp'
|
|
stream_id = list_output[i][2]
|
|
else:
|
|
proto = 'other'
|
|
stream_id = ''
|
|
session_dict['session'] = appsketch_api + '/session/' + pcap_file_id + '/' + proto + '/' + stream_id
|
|
comment_str_list.append(session_dict)
|
|
|
|
return comment_str_list
|
|
|
|
def TransferPcapNG(input_file_path):
|
|
with open(input_file_path, 'rb') as f:
|
|
first_four_bytes = f.read(4)
|
|
if first_four_bytes == bytes.fromhex('0a0d0d0a') or first_four_bytes == bytes.fromhex('0A0D0D0A'):
|
|
return input_file_path
|
|
else:
|
|
# Get the file name (including extension)
|
|
file_name_with_extension = os.path.basename(input_file_path)
|
|
# Remove the extension
|
|
transfer_file_path = os.path.splitext(file_name_with_extension)[0] + '.pcapng'
|
|
# Get the directory path without the file name
|
|
directory_path = os.path.dirname(input_file_path)
|
|
|
|
tshark_cmd = ['tshark', '-r', input_file_path, '-w', directory_path + '/' + transfer_file_path]
|
|
subprocess.run(tshark_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
|
input_file_path = './output/Transfer.pcapng'
|
|
return directory_path + '/' + transfer_file_path
|
|
|
|
if __name__=="__main__":
|
|
|
|
input_file_path = TransferPcapNG(sys.argv[1])
|
|
output_file_path = sys.argv[2]
|
|
appsketch_api = sys.argv[3]
|
|
pcap_file_id = sys.argv[4]
|
|
|
|
comment_str_list = ExtractPacketsCommentStr(input_file_path, appsketch_api, pcap_file_id)
|
|
# for i, pkt in enumerate(comment_str_list):
|
|
# print(comment_str_list[i])
|
|
|
|
binfile_r = open(input_file_path, 'rb')
|
|
binfile_w = open(input_file_path, 'rb')
|
|
binfile_output = open(output_file_path, 'wb')
|
|
|
|
size = os.path.getsize(input_file_path)
|
|
|
|
# Read Pcapng SHB
|
|
section_header_block_a = SectionHeaderBlock_a(binfile_r.read(24))
|
|
# section_header_block_a.display_content()
|
|
SHB_length = section_header_block_a._decimal_number
|
|
# Remaining bytes read
|
|
binfile_r.read(SHB_length - 24)
|
|
|
|
# Read Pcapng IDB
|
|
interface_description_block = InterfaceDescriptionBlock(binfile_r.read(8))
|
|
# interface_description_block.display_content()
|
|
IDB_length = interface_description_block._decimal_number
|
|
# Remaining bytes read
|
|
binfile_r.read(IDB_length - 8 )
|
|
|
|
# 4Byte alignment
|
|
# Get the number of bytes currently read
|
|
current_position = binfile_r.tell()
|
|
# Calculate the next 4-byte aligned position
|
|
aligned_position = (current_position + 3) & ~3
|
|
|
|
align_null_bytes_SHB = bytes.fromhex('')
|
|
for i in range(aligned_position - current_position):
|
|
align_null_bytes_SHB += bytes.fromhex('00')
|
|
|
|
write_data = binfile_w.read(current_position)
|
|
binfile_output.write(write_data)
|
|
binfile_output.write(align_null_bytes_SHB)
|
|
binfile_w.close()
|
|
|
|
# Packets Traversal Add Option
|
|
for i in range(len(comment_str_list)):
|
|
# Read Pcapng Packet Blocks
|
|
packet_block_a = PacketBlocks_a(binfile_r.read(28))
|
|
# packet_block_a.display_content()
|
|
|
|
packet_block_data = binfile_r.read(packet_block_a._Captured_Packet_Length_int)
|
|
# print('Packet_Data', binascii.b2a_hex(packet_block_data))
|
|
|
|
# 4Byte alignment
|
|
# Get the number of bytes currently read
|
|
current_position = binfile_r.tell()
|
|
# Calculate the next 4-byte aligned position
|
|
aligned_position = (current_position + 3) & ~3
|
|
align_null_bytes_PB_a = bytes.fromhex('')
|
|
for j in range(aligned_position - current_position):
|
|
align_null_bytes_PB_a += bytes.fromhex('00')
|
|
|
|
# 使用 `json.dumps` 方法将 JSON 对象格式化为带缩进的字符串
|
|
remark_str = json.dumps(comment_str_list[i], indent=4)
|
|
|
|
for j in range(4 - len(remark_str.encode('utf-8')) % 4):
|
|
binary_str = '00'
|
|
char = chr(int(binary_str, 2))
|
|
remark_str += char
|
|
|
|
Block_Type = packet_block_a._Block_Type
|
|
# Modify Block_Length
|
|
# Block_Length = packet_block_a._Block_Length
|
|
Block_Length = struct.pack('<I', packet_block_a._Block_Length_int + 8 + len(remark_str))
|
|
|
|
Interface_ID = packet_block_a._Interface_ID
|
|
Timestamp_Upper = packet_block_a._Timestamp_Upper
|
|
Timestamp_Lower = packet_block_a._Timestamp_Lower
|
|
Captured_Packet_Length = packet_block_a._Captured_Packet_Length
|
|
Original_Packet_Length = packet_block_a._Original_Packet_Length
|
|
|
|
# Modify Option_Length
|
|
Option_Code = bytes.fromhex('0100')
|
|
Option_Length = struct.pack('<H', len(remark_str))
|
|
Option_Value = remark_str.encode('utf-8')
|
|
End_of_Options = bytes.fromhex('00000000')
|
|
|
|
current_position = 2 + 2 + len(remark_str) + 4
|
|
aligned_position = (current_position + 3) & ~3
|
|
align_null_bytes_PB_b = bytes.fromhex('')
|
|
for j in range(aligned_position - current_position):
|
|
align_null_bytes_PB_b += bytes.fromhex('00')
|
|
|
|
# Modify Block_Total_Length_Redundant
|
|
Block_Total_Length_Redundant = struct.pack('<I', packet_block_a._Block_Length_int + 8 + len(remark_str))
|
|
|
|
write_data = Block_Type + Block_Length + Interface_ID + Timestamp_Upper + Timestamp_Lower + Captured_Packet_Length + Original_Packet_Length + \
|
|
packet_block_data + align_null_bytes_PB_a + \
|
|
Option_Code + Option_Length + Option_Value + End_of_Options + align_null_bytes_PB_b + \
|
|
Block_Total_Length_Redundant
|
|
|
|
binfile_output.write(write_data)
|
|
binfile_r.read(4)
|
|
|
|
if binfile_r.tell() == size:
|
|
break
|
|
|
|
# Get the number of bytes currently read
|
|
current_position = binfile_r.tell()
|
|
# Calculate the next 4-byte aligned position
|
|
aligned_position = (current_position + 3) & ~3
|
|
# Move the file pointer to a 4-byte aligned position
|
|
binfile_r.seek(aligned_position)
|
|
|
|
try:
|
|
os.remove(input_file_path)
|
|
print(f"File '{input_file_path}' has been deleted successfully.")
|
|
except FileNotFoundError:
|
|
print(f"File '{input_file_path}' does not exist.")
|
|
except PermissionError:
|
|
print(f"Permission denied: '{input_file_path}'.")
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}") |