from scapy.all import rdpcap, UDP def parser_hw_binary_detail(s): len_list = [i * 2 for i in [1, 1, 2, 4, 4, 2, 1, 1]] head_s = s[0:sum(len_list)] head = [] start = 0 for l in len_list: head.append(head_s[start:start + l]) start += l [Version, LogType, Count, Second, FlowSequence, DeviceId, Slot, Reserved] = head print(head) body_start = sum(len_list) len_list = [i * 2 for i in [1, 1, 1, 1, 4, 4, 4, 4, 2, 2, 2, 2, 4, 4, 4, 4, 4, 4, 2, 2, 1, 1, 2, 4]] bodys = [] for i in range(int(Count, 16)): body_s = s[body_start:body_start+sum(len_list)] body = [] start = 0 for l in len_list: body.append(body_s[start:start + l]) start += l [Prot, Operator, IpVersion, TosIPv4, SourceIP, SrcNatIP, DestIP, DestNatIP, SrcPort, SrcNatPort, DestPort, DestNatPort, StartTime, EndTime, InTotalPkg, InTotalByte, OutTotalPkg, OutTotalByte, SourVpnIndex, DestVpnIndex, Reserved1, EventTrend, Reserved2, Reserved3] = body print(body) def parser_hw_binary(s): sum_ = 0 counts = [0, 0, 0, 0, 0] len_list = [i * 2 for i in [1, 1, 2, 4, 4, 2, 1, 1]] head_s = s[0:sum(len_list)] Count = head_s[4:8] sum_ += int(Count, 16) body_start = sum(len_list) len_list = [i * 2 for i in [1, 1, 1, 1, 4, 4, 4, 4, 2, 2, 2, 2, 4, 4, 4, 4, 4, 4, 2, 2, 1, 1, 2, 4]] for i in range(int(Count, 16)): body_s = s[body_start:body_start+sum(len_list)] EventTrend = min(int(body_s[114:116]), 4) counts[EventTrend] += 1 body_start += sum(len_list) return (sum_, counts) def parser_nat(s): counts = [0, 0, 0] for i in range(20): body_s = s[i*46*2:(i+1)*46*2] EventTrend = body_s[36:38] EventTrend = min(int(body_s[36:38]), 2) counts[EventTrend] += 1 return counts def read_and_filter_pcap_hw(pcap_file, protocol="UDP"): nat_num = 0 count_nums = [0, 0, 0, 0, 0] packets = rdpcap(pcap_file) for pkt in packets: if protocol in pkt: # print(packet.show()) # 显示每个符合条件的数据包详情 udp_payload = bytes(pkt[UDP].payload) hex_payload = ''.join(f'{byte:02x}' for byte in udp_payload) (sum_, counts) = parser_hw_binary(hex_payload) nat_num += sum_ for i in range(5): count_nums[i] += counts[i] print(f"Total {len(packets)} {protocol} packets in hw_binary found.") print(f"unknown(00) log:\t{count_nums[0]}/{nat_num}({count_nums[0]/nat_num:.2f}%)") print(f"build(01) log:\t{count_nums[1]}/{nat_num}({count_nums[1]/nat_num:.2f}%)") print(f"aged(02) log:\t{count_nums[2]}/{nat_num}({count_nums[2]/nat_num:.2f}%)") print(f"period(03) log:\t{count_nums[3]}/{nat_num}({count_nums[3]/nat_num:.2f}%)") print(f"unknown(>03) log:\t{count_nums[4]}/{nat_num}({count_nums[4]/nat_num:.2f}%)") def read_and_filter_pcap_nat(pcap_file, protocol="UDP"): nat_num = 0 count_nums = [0, 0, 0] packets = rdpcap(pcap_file) for pkt in packets: if protocol in pkt: # print(packet.show()) # 显示每个符合条件的数据包详情 udp_payload = bytes(pkt[UDP].payload) hex_payload = ''.join(f'{byte:02x}' for byte in udp_payload) counts = parser_nat(hex_payload) for i in range(3): count_nums[i] += counts[i] print(f"Total {len(packets)} {protocol} packets in nat_format found.") nat_num = len(packets)*20 print(f"add(00) log:\t{count_nums[0]}/{nat_num}({count_nums[0]/nat_num:.2f}%)") print(f"del(01) log:\t{count_nums[1]}/{nat_num}({count_nums[1]/nat_num:.2f}%)") print(f"unknown(>01) log:\t{count_nums[2]}/{nat_num}({count_nums[2]/nat_num:.2f}%)") if __name__ == "__main__": pcap_file_path = "D://MESA//搞点项目//NAT//nat-pcap//hw_binary_1021.pcap" read_and_filter_pcap_hw(pcap_file_path) pcap_file_path = "D://MESA//搞点项目//NAT//nat-pcap//nat_1021.pcap" read_and_filter_pcap_nat(pcap_file_path) # print('ok!')