diff --git a/src/nat_format.cpp b/src/nat_format.cpp index 72f99a5..c67716d 100644 --- a/src/nat_format.cpp +++ b/src/nat_format.cpp @@ -21,7 +21,7 @@ #define HW_EVENT_ADD "SESSION_BUILT" #define HW_EVENT_DEL "SESSION_TEARDOWN" -#define HW_EVENT_NEW 0x01 +#define HW_EVENT_NEW 0x00 #define HW_EVENT_AGED 0x02 #define HW_EVENT_PERIOD 0x03 @@ -42,10 +42,13 @@ struct nat_format_global_info g_nat_format_info; -char *multicast_payload; -int cur_pkt = 0; +// 组播报文缓冲区和socket都是每个线程一个 +int platform_thread_num = get_thread_count(); -int udp_socket; +char **multicast_payloads; +int *cur_pkts; + +int *udp_sockets; struct sockaddr_in dst_addr = {0}; // 函数:子串匹配,返回子串结尾+1的位置,即值的起始位置 @@ -138,22 +141,22 @@ int extract_action_hs(char *data, int data_len, const char *key, char *dst) { } // 组播报文发送 -int send_multicast() { +int send_multicast(int thread_seq) { // 发之前统一填充发送时间 time_t now; time(&now); unsigned int now_timestamp = (int)now; for (int i = 0; i < g_nat_format_info.batch_size; i++) { - memcpy(multicast_payload + i*PAYLOAD_LEN+SEND_TIME_OFFSET, &now_timestamp, 4); + memcpy(multicast_payloads[thread_seq] + i*PAYLOAD_LEN+SEND_TIME_OFFSET, &now_timestamp, 4); } // 进行发送 - if (sendto(udp_socket, multicast_payload, PAYLOAD_LEN*g_nat_format_info.batch_size, 0, (struct sockaddr *)&dst_addr, sizeof(dst_addr)) < 0) { + if (sendto(udp_sockets[thread_seq], multicast_payloads[thread_seq], PAYLOAD_LEN*g_nat_format_info.batch_size, 0, (struct sockaddr *)&dst_addr, sizeof(dst_addr)) < 0) { MESA_handle_runtime_log(g_nat_format_info.log, RLOG_LV_INFO, "nat_format", "Send multicast failed: %s", strerror(errno)); - cur_pkt--; + cur_pkts[thread_seq]--; return -1; } - cur_pkt = 0; + cur_pkts[thread_seq] = 0; return 0; } @@ -185,31 +188,54 @@ int nat_format_init(void) { } // 分配并初始化组播报文存储空间,长度为 batch_size*46Bytes - multicast_payload = (char *)malloc(PAYLOAD_LEN * g_nat_format_info.batch_size); - memset(multicast_payload, 0, PAYLOAD_LEN * g_nat_format_info.batch_size); - - // 创建socket用于发包 - udp_socket = socket(AF_INET, SOCK_DGRAM, 0); - if (udp_socket == -1) { - printf("UDP multicast socket creation failed:%d, %s\n", errno, strerror(errno)); + multicast_payloads = (char **)malloc(platform_thread_num * sizeof(char *)); + if (multicast_payloads == NULL) { + printf("Memory of multicast_payloads allocation failure.\n"); return -1; } - // 绑定组播地址 + for (int i = 0; i < platform_thread_num; i++) { + multicast_payloads[i] = (char *)malloc(PAYLOAD_LEN * g_nat_format_info.batch_size); + if (multicast_payloads[i] == NULL) { + for (int j = 0; j < i; j++) { + free(multicast_payloads[j]); + } + free(multicast_payloads); + printf("Memory of multicast_payload for thread %d allocation failure.\n", i); + return -1; + } + memset(multicast_payloads[i], 0, PAYLOAD_LEN * g_nat_format_info.batch_size); + } + cur_pkts = (int *)malloc(platform_thread_num * sizeof(int)); + memset(cur_pkts, 0, platform_thread_num * sizeof(int)); + + // 组播地址 dst_addr.sin_family = AF_INET; dst_addr.sin_addr.s_addr = inet_addr(g_nat_format_info.multicast_ip); dst_addr.sin_port = htons(g_nat_format_info.multicast_port); struct in_addr multicast_addr; inet_pton(AF_INET, g_nat_format_info.multicast_ip, &multicast_addr.s_addr); - setsockopt(udp_socket, IPPROTO_IP, IP_MULTICAST_IF, &multicast_addr, sizeof(multicast_addr)); - // 绑定发包端口 + // 发包端口 struct sockaddr_in src_addr; src_addr.sin_family = AF_INET; src_addr.sin_port = htons(g_nat_format_info.host_port); src_addr.sin_addr.s_addr = inet_addr(g_nat_format_info.host_ip); - if (bind(udp_socket, (struct sockaddr*)&src_addr, sizeof(src_addr)) < 0) { - printf("socket bind failed\n"); - close(udp_socket); - return -1; + + // 创建socket用于发包 + udp_sockets = (int *)malloc(platform_thread_num * sizeof(int)); + for (int i = 0; i < platform_thread_num; i++) { + udp_sockets[i] = socket(AF_INET, SOCK_DGRAM, 0); + if (udp_sockets[i] == -1) { + free(udp_sockets); + printf("UDP multicast socket for thread %d creation failed:%d, %s\n", i, errno, strerror(errno)); + return -1; + } + setsockopt(udp_sockets[i], IPPROTO_IP, IP_MULTICAST_IF, &multicast_addr, sizeof(multicast_addr)); + if (bind(udp_sockets[i], (struct sockaddr*)&src_addr, sizeof(src_addr)) < 0) { + free(udp_sockets); + printf("socket bind for thread %d failed:%d, %s\n", i, errno, strerror(errno)); + close(udp_sockets[i]); + return -1; + } } return 0; @@ -217,7 +243,14 @@ int nat_format_init(void) { // 卸载函数 void nat_format_destroy(void) { - free(multicast_payload); + for (int j = 0; j < platform_thread_num; j++) { + free(multicast_payloads[j]); + } + free(multicast_payloads); + + free(cur_pkts); + + free(udp_sockets); } // 入口函数 @@ -328,12 +361,12 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void } // 将提取出来的信息写进组播载荷 - memcpy(multicast_payload + cur_pkt*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN); - cur_pkt++; + memcpy(multicast_payloads[thread_seq] + cur_pkts[thread_seq]*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN); + cur_pkts[thread_seq]++; // 攒够20个进行发送 - if (cur_pkt == g_nat_format_info.batch_size) { - send_multicast(); + if (cur_pkts[thread_seq] == g_nat_format_info.batch_size) { + send_multicast(thread_seq); } } // 华三syslog @@ -409,12 +442,12 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void } // 将提取出来的信息写进组播载荷 - memcpy(multicast_payload + cur_pkt*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN); - cur_pkt++; + memcpy(multicast_payloads[thread_seq] + cur_pkts[thread_seq]*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN); + cur_pkts[thread_seq]++; // 攒够20个进行发送 - if (cur_pkt == g_nat_format_info.batch_size) { - send_multicast(); + if (cur_pkts[thread_seq] == g_nat_format_info.batch_size) { + send_multicast(thread_seq); } } // 迪普syslog TODO @@ -447,7 +480,7 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void MESA_handle_runtime_log(g_nat_format_info.log, RLOG_LV_INFO, "nat_format Huawei Binary", "Unknown Version %d", hb_head->Version); return APP_STATE_GIVEME; } - u_int16_t log_num = hb_head->Count; + u_int16_t log_num = ntohs(hb_head->Count); // 提取防火墙日志生成时间 nat_payload.fw_log_timestamp = hb_head->Second; // 分别处理每一个body @@ -486,12 +519,12 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void } // 将提取出来的信息写进组播载荷 - memcpy(multicast_payload + cur_pkt*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN); - cur_pkt++; + memcpy(multicast_payloads[thread_seq] + cur_pkts[thread_seq]*PAYLOAD_LEN, &nat_payload, PAYLOAD_LEN); + cur_pkts[thread_seq]++; // 攒够20个进行发送 - if (cur_pkt == g_nat_format_info.batch_size) { - send_multicast(); + if (cur_pkts[thread_seq] == g_nat_format_info.batch_size) { + send_multicast(thread_seq); } // 定位下一个body的offset -- 03版本28,08版本的IPV4、NOPAT为44,USER为209,URL、TLV每个不同,由附加长度确定 @@ -500,7 +533,7 @@ char nat_format_entry(struct streaminfo *a_udp, void **pme, int thread_seq, void offset += body_len; } else { char *tmp = (char *)hb_body; - u_int16_t AppendLength = *(u_int16_t *)(tmp + sizeof(struct hw_binary_log_body) + HW_BINARY_BODY_V8_TLV_OFFSET); + u_int16_t AppendLength = ntohs(*(u_int16_t *)(tmp + sizeof(struct hw_binary_log_body) + HW_BINARY_BODY_V8_TLV_OFFSET)); offset += (HW_BINARY_BODY_LENGTH_V8_IPV4 + AppendLength); } } diff --git a/tools/binary_filed_extraction.py b/tools/binary_filed_extraction.py new file mode 100644 index 0000000..31573ad --- /dev/null +++ b/tools/binary_filed_extraction.py @@ -0,0 +1,102 @@ +from scapy.all import rdpcap, UDP + +def parser_hw_binary_detail(s): + len_list = [i * 2 for i in [1, 1, 2, 4, 4, 2, 1, 1]] + head_s = s[0:sum(len_list)] + head = [] + start = 0 + for l in len_list: + head.append(head_s[start:start + l]) + start += l + [Version, LogType, Count, Second, FlowSequence, DeviceId, Slot, Reserved] = head + print(head) + + body_start = sum(len_list) + len_list = [i * 2 for i in [1, 1, 1, 1, 4, 4, 4, 4, 2, 2, 2, 2, 4, 4, 4, 4, 4, 4, 2, 2, 1, 1, 2, 4]] + bodys = [] + for i in range(int(Count, 16)): + body_s = s[body_start:body_start+sum(len_list)] + body = [] + start = 0 + for l in len_list: + body.append(body_s[start:start + l]) + start += l + [Prot, Operator, IpVersion, TosIPv4, SourceIP, SrcNatIP, DestIP, DestNatIP, SrcPort, SrcNatPort, DestPort, DestNatPort, StartTime, EndTime, InTotalPkg, InTotalByte, OutTotalPkg, OutTotalByte, SourVpnIndex, DestVpnIndex, Reserved1, EventTrend, Reserved2, Reserved3] = body + print(body) + +def parser_hw_binary(s): + sum_ = 0 + counts = [0, 0, 0, 0, 0] + + len_list = [i * 2 for i in [1, 1, 2, 4, 4, 2, 1, 1]] + head_s = s[0:sum(len_list)] + Count = head_s[4:8] + sum_ += int(Count, 16) + + body_start = sum(len_list) + len_list = [i * 2 for i in [1, 1, 1, 1, 4, 4, 4, 4, 2, 2, 2, 2, 4, 4, 4, 4, 4, 4, 2, 2, 1, 1, 2, 4]] + for i in range(int(Count, 16)): + body_s = s[body_start:body_start+sum(len_list)] + EventTrend = min(int(body_s[114:116]), 4) + counts[EventTrend] += 1 + body_start += sum(len_list) + return (sum_, counts) + +def parser_nat(s): + counts = [0, 0, 0] + for i in range(20): + body_s = s[i*46*2:(i+1)*46*2] + EventTrend = body_s[36:38] + EventTrend = min(int(body_s[36:38]), 2) + counts[EventTrend] += 1 + return counts + +def read_and_filter_pcap_hw(pcap_file, protocol="UDP"): + nat_num = 0 + count_nums = [0, 0, 0, 0, 0] + + packets = rdpcap(pcap_file) + for pkt in packets: + if protocol in pkt: + # print(packet.show()) # 显示每个符合条件的数据包详情 + udp_payload = bytes(pkt[UDP].payload) + hex_payload = ''.join(f'{byte:02x}' for byte in udp_payload) + (sum_, counts) = parser_hw_binary(hex_payload) + nat_num += sum_ + for i in range(5): + count_nums[i] += counts[i] + + print(f"Total {len(packets)} {protocol} packets in hw_binary found.") + print(f"unknown(00) log:\t{count_nums[0]}/{nat_num}({count_nums[0]/nat_num:.2f}%)") + print(f"build(01) log:\t{count_nums[1]}/{nat_num}({count_nums[1]/nat_num:.2f}%)") + print(f"aged(02) log:\t{count_nums[2]}/{nat_num}({count_nums[2]/nat_num:.2f}%)") + print(f"period(03) log:\t{count_nums[3]}/{nat_num}({count_nums[3]/nat_num:.2f}%)") + print(f"unknown(>03) log:\t{count_nums[4]}/{nat_num}({count_nums[4]/nat_num:.2f}%)") + +def read_and_filter_pcap_nat(pcap_file, protocol="UDP"): + nat_num = 0 + count_nums = [0, 0, 0] + + packets = rdpcap(pcap_file) + for pkt in packets: + if protocol in pkt: + # print(packet.show()) # 显示每个符合条件的数据包详情 + udp_payload = bytes(pkt[UDP].payload) + hex_payload = ''.join(f'{byte:02x}' for byte in udp_payload) + counts = parser_nat(hex_payload) + for i in range(3): + count_nums[i] += counts[i] + + print(f"Total {len(packets)} {protocol} packets in nat_format found.") + nat_num = len(packets)*20 + print(f"add(00) log:\t{count_nums[0]}/{nat_num}({count_nums[0]/nat_num:.2f}%)") + print(f"del(01) log:\t{count_nums[1]}/{nat_num}({count_nums[1]/nat_num:.2f}%)") + print(f"unknown(>01) log:\t{count_nums[2]}/{nat_num}({count_nums[2]/nat_num:.2f}%)") + + +if __name__ == "__main__": + pcap_file_path = "D://MESA//搞点项目//NAT//nat-pcap//hw_binary_1021.pcap" + read_and_filter_pcap_hw(pcap_file_path) + pcap_file_path = "D://MESA//搞点项目//NAT//nat-pcap//nat_1021.pcap" + read_and_filter_pcap_nat(pcap_file_path) + # print('ok!') \ No newline at end of file