This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
appsketch-works-pcap-comment/PcapNGFormatAnalys.py
2024-08-21 07:56:53 +00:00

274 lines
12 KiB
Python

#!/usr/bin/evn python
# -*- coding:utf-8 -*-
import binascii
import sys
import os
import subprocess
import json
import struct
class SectionHeaderBlock_a: # SHB
def __init__(self, data):
self._Block_Type = data[0:4]
self._Block_Length = data[4:8]
self._Byte_Order_Magic = data[8:12]
self._Major_Version = data[12:14]
self._Minor_Version = data[14:16]
self._Section_Length = data[16:24]
self._decimal_number = int.from_bytes(self._Block_Length, byteorder='little')
def display_content(self):
print(binascii.b2a_hex(self._Block_Type))
print(binascii.b2a_hex(self._Block_Length))
print(binascii.b2a_hex(self._Byte_Order_Magic))
print(binascii.b2a_hex(self._Major_Version))
print(binascii.b2a_hex(self._Minor_Version))
print(binascii.b2a_hex(self._Section_Length))
print(self._decimal_number)
class SectionHeaderBlock_b: # SHB
def __init__(self, data, Block_Length):
self._Options = data[0:Block_Length - 24 - 4 ]
self._Block_Length_redundant = data[Block_Length - 24 - 4 : Block_Length - 24]
def display_content(self):
print(binascii.b2a_hex(self._Options))
print(binascii.b2a_hex(self._Block_Length_redundant))
class InterfaceDescriptionBlock: # IDB
def __init__(self, data):
self._Block_Type = data[0:4]
self._Block_Length = data[4:8]
self._decimal_number = int.from_bytes(self._Block_Length, byteorder='little')
def display_content(self):
print(binascii.b2a_hex(self._Block_Type))
print(binascii.b2a_hex(self._Block_Length))
print(self._decimal_number)
class PacketBlocks_a: # PB
def __init__(self, data):
self._Block_Type = data[0:4]
self._Block_Length = data[4:8]
self._Block_Length_int = int.from_bytes(self._Block_Length, byteorder='little')
self._Interface_ID = data[8:12]
self._Timestamp_Upper = data[12:16]
self._Timestamp_Lower = data[16:20]
self._Captured_Packet_Length = data[20:24]
self._Captured_Packet_Length_int = int.from_bytes(self._Captured_Packet_Length, byteorder='little')
self._Original_Packet_Length = data[24:28]
self._Original_Packet_Length_int = int.from_bytes(self._Original_Packet_Length, byteorder='little')
def display_content(self):
print('Block_Type', binascii.b2a_hex(self._Block_Type))
print('Block_Length', binascii.b2a_hex(self._Block_Length))
print('Block_Length_int', self._Block_Length_int)
print('Interface_ID', binascii.b2a_hex(self._Interface_ID))
print('Timestamp_Upper', binascii.b2a_hex(self._Timestamp_Upper))
print('Timestamp_Lower', binascii.b2a_hex(self._Timestamp_Lower))
print('Captured_Packet_Length', binascii.b2a_hex(self._Captured_Packet_Length))
print('Captured_Packet_Length_int', self._Captured_Packet_Length_int)
print('Original_Packet_Length', binascii.b2a_hex(self._Original_Packet_Length))
print('Original_Packet_Length_int', self._Original_Packet_Length_int)
class PacketBlocks_b: # PB
def __init__(self, data, Block_Length, Packet_Length):
self._Packet_Data= data[0:Packet_Length]
self._Options = data[Packet_Length:Block_Length - 32]
self._Block_Total_Length_redundant = data[Block_Length - 32: Block_Length - 28]
def display_content(self):
print('Packet_Data', binascii.b2a_hex(self._Packet_Data))
print('Options', binascii.b2a_hex(self._Options))
print('Block_Total_Length_redundant', binascii.b2a_hex(self._Block_Total_Length_redundant))
class PB_Options:
def __init__(self, data):
self._Option_Code= data[0:2]
self._Option_Length = data[2:4]
self._Option_Length_int = int.from_bytes(self._Option_Length, byteorder='little')
self._Option_Value = data[4:self._Option_Length_int + 4]
self._End_of_Options = data[self._Option_Length_int + 4 : ]
def display_content(self):
print(binascii.b2a_hex(self._Option_Code))
print(binascii.b2a_hex(self._Option_Length))
print(self._Option_Length_int)
print(binascii.b2a_hex(self._Option_Value))
print(binascii.b2a_hex(self._End_of_Options))
def ExtractPacketsCommentStr(input_file_path, appsketch_api, pcap_file_id):
# Use tshark to extract the five_tuple info
tshark_cmd = ['tshark', '-r', input_file_path, '-T', 'fields', '-e', 'frame.number', '-e', 'tcp.stream', '-e', 'udp.stream']
ret_str = subprocess.run(tshark_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
decoded_result = ret_str.stdout
list_1 = decoded_result.splitlines()
list_output = []
for i in range(0, len(list_1)) :
list_2 = list_1[i].split("\t")
list_output.append(list_2)
comment_str_list = []
for i in range(len(list_output)):
# frame_id = list_output[i][0]
session_dict = {}
if list_output[i][1] != '':
proto = 'tcp'
stream_id = list_output[i][1]
elif list_output[i][1] == '' and list_output[i][2] != '':
proto = 'udp'
stream_id = list_output[i][2]
else:
proto = 'other'
stream_id = ''
session_dict['session'] = appsketch_api + '/session/' + pcap_file_id + '/' + proto + '/' + stream_id
comment_str_list.append(session_dict)
return comment_str_list
def TransferPcapNG(input_file_path):
with open(input_file_path, 'rb') as f:
first_four_bytes = f.read(4)
if first_four_bytes == bytes.fromhex('0a0d0d0a') or first_four_bytes == bytes.fromhex('0A0D0D0A'):
return input_file_path
else:
# Get the file name (including extension)
file_name_with_extension = os.path.basename(input_file_path)
# Remove the extension
transfer_file_path = os.path.splitext(file_name_with_extension)[0] + '.pcapng'
# Get the directory path without the file name
directory_path = os.path.dirname(input_file_path)
tshark_cmd = ['tshark', '-r', input_file_path, '-w', directory_path + '/' + transfer_file_path]
subprocess.run(tshark_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
input_file_path = './output/Transfer.pcapng'
return directory_path + '/' + transfer_file_path
if __name__=="__main__":
input_file_path = TransferPcapNG(sys.argv[1])
output_file_path = sys.argv[2]
appsketch_api = sys.argv[3]
pcap_file_id = sys.argv[4]
comment_str_list = ExtractPacketsCommentStr(input_file_path, appsketch_api, pcap_file_id)
# for i, pkt in enumerate(comment_str_list):
# print(comment_str_list[i])
binfile_r = open(input_file_path, 'rb')
binfile_w = open(input_file_path, 'rb')
binfile_output = open(output_file_path, 'wb')
size = os.path.getsize(input_file_path)
# Read Pcapng SHB
section_header_block_a = SectionHeaderBlock_a(binfile_r.read(24))
# section_header_block_a.display_content()
SHB_length = section_header_block_a._decimal_number
# Remaining bytes read
binfile_r.read(SHB_length - 24)
# Read Pcapng IDB
interface_description_block = InterfaceDescriptionBlock(binfile_r.read(8))
# interface_description_block.display_content()
IDB_length = interface_description_block._decimal_number
# Remaining bytes read
binfile_r.read(IDB_length - 8 )
# 4Byte alignment
# Get the number of bytes currently read
current_position = binfile_r.tell()
# Calculate the next 4-byte aligned position
aligned_position = (current_position + 3) & ~3
align_null_bytes_SHB = bytes.fromhex('')
for i in range(aligned_position - current_position):
align_null_bytes_SHB += bytes.fromhex('00')
write_data = binfile_w.read(current_position)
binfile_output.write(write_data)
binfile_output.write(align_null_bytes_SHB)
binfile_w.close()
# Packets Traversal Add Option
for i in range(len(comment_str_list)):
# Read Pcapng Packet Blocks
packet_block_a = PacketBlocks_a(binfile_r.read(28))
# packet_block_a.display_content()
packet_block_data = binfile_r.read(packet_block_a._Captured_Packet_Length_int)
# print('Packet_Data', binascii.b2a_hex(packet_block_data))
# 4Byte alignment
# Get the number of bytes currently read
current_position = binfile_r.tell()
# Calculate the next 4-byte aligned position
aligned_position = (current_position + 3) & ~3
align_null_bytes_PB_a = bytes.fromhex('')
for j in range(aligned_position - current_position):
align_null_bytes_PB_a += bytes.fromhex('00')
# 使用 `json.dumps` 方法将 JSON 对象格式化为带缩进的字符串
remark_str = json.dumps(comment_str_list[i], indent=4)
for j in range(4 - len(remark_str.encode('utf-8')) % 4):
binary_str = '00'
char = chr(int(binary_str, 2))
remark_str += char
Block_Type = packet_block_a._Block_Type
# Modify Block_Length
# Block_Length = packet_block_a._Block_Length
Block_Length = struct.pack('<I', packet_block_a._Block_Length_int + 8 + len(remark_str))
Interface_ID = packet_block_a._Interface_ID
Timestamp_Upper = packet_block_a._Timestamp_Upper
Timestamp_Lower = packet_block_a._Timestamp_Lower
Captured_Packet_Length = packet_block_a._Captured_Packet_Length
Original_Packet_Length = packet_block_a._Original_Packet_Length
# Modify Option_Length
Option_Code = bytes.fromhex('0100')
Option_Length = struct.pack('<H', len(remark_str))
Option_Value = remark_str.encode('utf-8')
End_of_Options = bytes.fromhex('00000000')
current_position = 2 + 2 + len(remark_str) + 4
aligned_position = (current_position + 3) & ~3
align_null_bytes_PB_b = bytes.fromhex('')
for j in range(aligned_position - current_position):
align_null_bytes_PB_b += bytes.fromhex('00')
# Modify Block_Total_Length_Redundant
Block_Total_Length_Redundant = struct.pack('<I', packet_block_a._Block_Length_int + 8 + len(remark_str))
write_data = Block_Type + Block_Length + Interface_ID + Timestamp_Upper + Timestamp_Lower + Captured_Packet_Length + Original_Packet_Length + \
packet_block_data + align_null_bytes_PB_a + \
Option_Code + Option_Length + Option_Value + End_of_Options + align_null_bytes_PB_b + \
Block_Total_Length_Redundant
binfile_output.write(write_data)
binfile_r.read(4)
if binfile_r.tell() == size:
break
# Get the number of bytes currently read
current_position = binfile_r.tell()
# Calculate the next 4-byte aligned position
aligned_position = (current_position + 3) & ~3
# Move the file pointer to a 4-byte aligned position
binfile_r.seek(aligned_position)
try:
os.remove(input_file_path)
print(f"File '{input_file_path}' has been deleted successfully.")
except FileNotFoundError:
print(f"File '{input_file_path}' does not exist.")
except PermissionError:
print(f"Permission denied: '{input_file_path}'.")
except Exception as e:
print(f"An error occurred: {e}")