From 542f4cbdfa6f58dd8a07e35663116a08f54170bd Mon Sep 17 00:00:00 2001 From: wangmenglan Date: Tue, 9 May 2023 22:12:38 +0800 Subject: [PATCH] =?UTF-8?q?TSG-14930=20TFE=E6=94=AF=E6=8C=81=E5=8F=91?= =?UTF-8?q?=E9=80=81=E6=8E=A7=E5=88=B6=E6=8A=A5=E6=96=87=E7=BB=99SAPP?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/CMakeLists.txt | 2 +- common/include/tfe_packet_io.h | 14 +- .../{tfe_metrics.h => tfe_packet_io_fs.h} | 31 +-- common/include/tfe_utils.h | 12 - common/src/tfe_cmsg.cpp | 8 +- common/src/tfe_metrics.cpp | 176 --------------- common/src/tfe_packet_io.cpp | 103 +++++---- common/src/tfe_packet_io_fs.cpp | 205 ++++++++++++++++++ common/src/tfe_utils.cpp | 57 ----- platform/src/acceptor_kni_v4.cpp | 14 +- platform/src/proxy.cpp | 7 +- platform/src/tcp_stream.cpp | 1 + 12 files changed, 296 insertions(+), 334 deletions(-) rename common/include/{tfe_metrics.h => tfe_packet_io_fs.h} (69%) delete mode 100644 common/src/tfe_metrics.cpp create mode 100644 common/src/tfe_packet_io_fs.cpp diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 9722b0d..ebd99f6 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -5,7 +5,7 @@ src/tap.cpp src/io_uring.cpp src/intercept_policy.cpp src/tfe_fieldstat.cpp src/tfe_addr_tuple4.cpp src/tfe_packet_io.cpp src/tfe_session_table.cpp src/tfe_ctrl_packet.cpp src/tfe_raw_packet.cpp - src/tfe_mpack.cpp src/tfe_metrics.cpp src/mpack.cpp) + src/tfe_mpack.cpp src/tfe_packet_io_fs.cpp src/mpack.cpp ) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/../bpf/) target_include_directories(common PRIVATE ${CMAKE_CURRENT_LIST_DIR}/../platform/include/internal) diff --git a/common/include/tfe_packet_io.h b/common/include/tfe_packet_io.h index 1795cc9..eaf1d74 100644 --- a/common/include/tfe_packet_io.h +++ b/common/include/tfe_packet_io.h @@ -26,7 +26,7 @@ struct tap_ctx char *buff; }; -struct acceptor_thread_ctx +struct packet_io_thread_ctx { pthread_t tid; int thread_index; @@ -36,7 +36,7 @@ struct acceptor_thread_ctx struct sf_metrics *sf_metrics; struct packet_io *ref_io; - struct global_metrics *ref_metrics; + struct packet_io_fs *ret_fs_state; struct policy_enforcer *ref_enforcer; struct tfe_proxy *ref_proxy; struct acceptor_kni_v4 *ref_acceptor_ctx; @@ -71,7 +71,7 @@ struct session_ctx struct tfe_cmsg *cmsg; - struct acceptor_thread_ctx *ref_thread_ctx; + struct packet_io_thread_ctx *ref_thread_ctx; }; struct acceptor_kni_v4 @@ -85,8 +85,8 @@ struct acceptor_kni_v4 cpu_set_t coremask; struct packet_io *io; - struct global_metrics *metrics; - struct acceptor_thread_ctx work_threads[TFE_THREAD_MAX]; + struct packet_io_fs *packet_io_fs; + struct packet_io_thread_ctx work_threads[TFE_THREAD_MAX]; struct tfe_proxy *ref_proxy; }; @@ -96,8 +96,8 @@ int is_enable_iouring(struct packet_io *handle); void tfe_tap_ctx_destory(struct tap_ctx *handler); struct tap_ctx *tfe_tap_ctx_create(void *ctx); -int packet_io_thread_init(struct packet_io *handle, struct acceptor_thread_ctx *thread_ctx); -void packet_io_thread_wait(struct packet_io *handle, struct acceptor_thread_ctx *thread_ctx, int timeout_ms); +int packet_io_thread_init(struct packet_io *handle, struct packet_io_thread_ctx *thread_ctx); +void packet_io_thread_wait(struct packet_io *handle, struct packet_io_thread_ctx *thread_ctx, int timeout_ms); void packet_io_destory(struct packet_io *handle); struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_t *coremask); diff --git a/common/include/tfe_metrics.h b/common/include/tfe_packet_io_fs.h similarity index 69% rename from common/include/tfe_metrics.h rename to common/include/tfe_packet_io_fs.h index d96277a..e935f07 100644 --- a/common/include/tfe_metrics.h +++ b/common/include/tfe_packet_io_fs.h @@ -1,5 +1,5 @@ -#ifndef _GLOBAL_METRICS_H -#define _GLOBAL_METRICS_H +#ifndef _TFE_PACKET_IO_FS_H +#define _TFE_PACKET_IO_FS_H #ifdef __cpluscplus extern "C" @@ -9,19 +9,14 @@ extern "C" #include "tfe_utils.h" #include -struct global_metrics_config -{ - char output_file[256]; - char statsd_server[32]; - int statsd_port; - int statsd_format; - int statsd_cycle; - int prometheus_listen_port; - char prometheus_listen_url[256]; +struct throughput_metrics +{ + uint64_t n_pkts; + uint64_t n_bytes; }; -struct global_metrics +struct packet_io_fs { struct throughput_metrics raw_pkt_rx; // 累计值 struct throughput_metrics raw_pkt_tx; // 累计值 @@ -47,20 +42,18 @@ struct global_metrics uint64_t ctrl_pkt_resetall_num; // 累计值 uint64_t ctrl_pkt_error_num; // 累计值 - uint64_t sf_active_times; // 累计值 - uint64_t sf_inactive_times; // 累计值 - uint64_t session_nums; // 瞬时值 uint64_t send_log; // 瞬时值 - struct global_metrics_config config; screen_stat_handle_t fs_handle; int fs_id[128]; }; -struct global_metrics *global_metrics_create(); -void global_metrics_destory(struct global_metrics *metrics); -void global_metrics_dump(struct global_metrics *metrics); +struct packet_io_fs *packet_io_fs_create(); +void packet_io_fs_destory(struct packet_io_fs *handle); +void packet_io_fs_dump(struct packet_io_fs *handle); + +void throughput_metrics_inc(struct throughput_metrics *iterm, uint64_t n_pkts, uint64_t n_bytes); #ifdef __cpluscplus } diff --git a/common/include/tfe_utils.h b/common/include/tfe_utils.h index 7011044..e4d409c 100644 --- a/common/include/tfe_utils.h +++ b/common/include/tfe_utils.h @@ -225,17 +225,5 @@ struct udp_hdr u_int16_t uh_sum; /* udp checksum */ } __attribute__((__packed__)); -void build_udp_header(const char *l3_hdr, int l3_hdr_len, struct udp_hdr *udp_hdr, u_int16_t udp_sport, u_int16_t udp_dport, int payload_len); -void build_ip_header(struct ip *ip_hdr, u_int8_t next_protocol, const char *src_addr, const char *dst_addr, uint16_t payload_len); -void build_ether_header(struct ethhdr *eth_hdr, uint16_t next_protocol, const char *src_mac, const char *dst_mac); - int str_to_mac(const char *str, char *mac_buff); int get_mac_by_device_name(const char *dev_name, char *mac_buff); - -struct throughput_metrics -{ - uint64_t n_pkts; - uint64_t n_bytes; -}; - -void throughput_metrics_inc(struct throughput_metrics *iterm, uint64_t n_pkts, uint64_t n_bytes); diff --git a/common/src/tfe_cmsg.cpp b/common/src/tfe_cmsg.cpp index d16705b..6f9d2b6 100644 --- a/common/src/tfe_cmsg.cpp +++ b/common/src/tfe_cmsg.cpp @@ -25,7 +25,7 @@ struct tfe_cmsg_tlv struct tfe_cmsg { uint8_t flag; - uint8_t ref; + uint8_t ref; pthread_rwlock_t rwlock; uint16_t nr_tlvs; struct tfe_cmsg_tlv* tlvs[TFE_CMSG_TLV_NR_MAX]; @@ -47,8 +47,8 @@ struct tfe_cmsg* tfe_cmsg_init() pthread_rwlock_init(&(cmsg->rwlock), NULL); ATOMIC_ZERO(&cmsg->flag); - ATOMIC_ZERO(&cmsg->ref); - ATOMIC_INC(&cmsg->ref); + ATOMIC_ZERO(&cmsg->ref); + ATOMIC_INC(&cmsg->ref); return cmsg; } @@ -73,7 +73,7 @@ void tfe_cmsg_dup(struct tfe_cmsg *cmsg) { if (cmsg == NULL) return; - ATOMIC_INC(&cmsg->ref); + ATOMIC_INC(&cmsg->ref); } void tfe_cmsg_set_flag(struct tfe_cmsg *cmsg, uint8_t flag) diff --git a/common/src/tfe_metrics.cpp b/common/src/tfe_metrics.cpp deleted file mode 100644 index aebf9dd..0000000 --- a/common/src/tfe_metrics.cpp +++ /dev/null @@ -1,176 +0,0 @@ -#include -#include -#include -#include - -#include "tfe_proxy.h" -#include "tfe_metrics.h" - -enum SCE_STAT_FIELD -{ - // hit block policy - STAT_HIT_POLICY_PKT, - STAT_HIT_POLICY_B, - - // dev nf interface - STAT_RAW_PKT_RX_PKT, - STAT_RAW_PKT_RX_B, - - STAT_RAW_PKT_TX_PKT, - STAT_RAW_PKT_TX_B, - - STAT_DECRYPTED_TX_PKT, - STAT_DECRYPTED_TX_B, - STAT_DECRYPTED_RX_PKT, - STAT_DECRYPTED_RX_B, - - // control packet - STAT_CONTROL_RX_PKT, - STAT_CONTROL_RX_B, - STAT_CONTROL_TX_PKT, - STAT_CONTROL_TX_B, - - STAT_TAP_RX_PKT, - STAT_TAP_RX_B, - STAT_TAP_TX_PKT, - STAT_TAP_TX_B, - - STAT_TAP_C_RX_PKT, - STAT_TAP_C_RX_B, - STAT_TAP_C_TX_PKT, - STAT_TAP_C_TX_B, - - STAT_TAP_S_RX_PKT, - STAT_TAP_S_RX_B, - STAT_TAP_S_TX_PKT, - STAT_TAP_S_TX_B, - - STAT_CTRL_PKT_OPENING, - STAT_CTRL_PKT_ACTIVE, - STAT_CTRL_PKT_CLOSING, - STAT_CTRL_PKT_RESETALL, - STAT_CTRL_PKT_ERROR, - - // send log - STAT_SEND_LOG, - - // max - STAT_MAX, -}; - -static const char *stat_map[] = -{ - // hit policy - [STAT_HIT_POLICY_PKT] = "hit_policy_pkt", - [STAT_HIT_POLICY_B] = "hit_policy_B", - - // dev nf interface - [STAT_RAW_PKT_RX_PKT] = "raw_rx_pkt", - [STAT_RAW_PKT_RX_B] = "raw_rx_B", - - [STAT_RAW_PKT_TX_PKT] = "raw_tx_pkt", - [STAT_RAW_PKT_TX_B] = "raw_tx_B", - - // decrypted - [STAT_DECRYPTED_TX_PKT] = "decrypt_tx_pkt", - [STAT_DECRYPTED_TX_B] = "decrypt_tx_B", - [STAT_DECRYPTED_RX_PKT] = "decrypt_rx_pkt", - [STAT_DECRYPTED_RX_B] = "decrypt_rx_B", - - // control packet - [STAT_CONTROL_RX_PKT] = "ctrl_rx_pkt", - [STAT_CONTROL_RX_B] = "ctrl_rx_B", - [STAT_CONTROL_TX_PKT] = "ctrl_tx_pkt", - [STAT_CONTROL_TX_B] = "ctrl_tx_B", - - // tap packet - [STAT_TAP_RX_PKT] = "tap_rx_pkt", - [STAT_TAP_RX_B] = "tap_rx_B", - [STAT_TAP_TX_PKT] = "tap_tx_pkt", - [STAT_TAP_TX_B] = "tap_tx_B", - [STAT_TAP_C_RX_PKT] = "tap_c_rx_pkt", - [STAT_TAP_C_RX_B] = "tap_c_rx_B", - [STAT_TAP_C_TX_PKT] = "tap_c_tx_pkt", - [STAT_TAP_C_TX_B] = "tap_c_tx_B", - [STAT_TAP_S_RX_PKT] = "tap_s_rx_pkt", - [STAT_TAP_S_RX_B] = "tap_s_rx_B", - [STAT_TAP_S_TX_PKT] = "tap_s_tx_pkt", - [STAT_TAP_S_TX_B] = "tap_s_tx_B", - - [STAT_CTRL_PKT_OPENING] = "ctrl_pkt_open", - [STAT_CTRL_PKT_ACTIVE] = "ctrl_pkt_avtive", - [STAT_CTRL_PKT_CLOSING] = "ctrl_pkt_close", - [STAT_CTRL_PKT_RESETALL] = "ctrl_pkt_reset", - [STAT_CTRL_PKT_ERROR] = "ctrl_pkt_error", - - // send log - [STAT_SEND_LOG] = "send_log", - - [STAT_MAX] = NULL -}; - -struct global_metrics *global_metrics_create() -{ - struct global_metrics *metrics = (struct global_metrics *)calloc(1, sizeof(struct global_metrics)); - - metrics->fs_handle=tfe_proxy_get_fs_handle(); - for (int i = 0; i < STAT_MAX; i++) - { - metrics->fs_id[i] = FS_register(metrics->fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, stat_map[i]); - } - - return metrics; -} - -void global_metrics_destory(struct global_metrics *metrics) -{ - if (metrics) - { - free(metrics); - metrics = NULL; - } -} - -void global_metrics_dump(struct global_metrics *metrics) -{ - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_POLICY_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_policy.n_pkts), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_POLICY_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_policy.n_bytes), 0, __ATOMIC_RELAXED)); - - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->raw_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->raw_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); - - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->raw_pkt_tx.n_pkts), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->raw_pkt_tx.n_bytes), 0, __ATOMIC_RELAXED)); - - - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DECRYPTED_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->decrypt_tx.n_pkts), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DECRYPTED_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->decrypt_tx.n_bytes), 0, __ATOMIC_RELAXED)); - - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DECRYPTED_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->decrypt_rx.n_pkts), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DECRYPTED_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->decrypt_rx.n_bytes), 0, __ATOMIC_RELAXED)); - - // control packet - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); - - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_TAP_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_TAP_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_TAP_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_TAP_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); - - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_TAP_C_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_TAP_C_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_TAP_C_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_TAP_C_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); - - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_OPENING], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_opening_num), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_ACTIVE], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_active_num), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_CLOSING], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_closing_num), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_RESETALL], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_resetall_num), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_ERROR], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_error_num), 0, __ATOMIC_RELAXED)); - - // send log - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_SEND_LOG], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->send_log), 0, __ATOMIC_RELAXED)); -} diff --git a/common/src/tfe_packet_io.cpp b/common/src/tfe_packet_io.cpp index 40140ac..ef9c8f9 100644 --- a/common/src/tfe_packet_io.cpp +++ b/common/src/tfe_packet_io.cpp @@ -20,7 +20,7 @@ #include "tfe_ctrl_packet.h" #include "tfe_raw_packet.h" #include "io_uring.h" -#include "tfe_metrics.h" +#include "tfe_packet_io_fs.h" #include "tfe_cmsg.h" #include "tfe_tcp_restore.h" #include "tfe_stream.h" @@ -162,14 +162,6 @@ extern void chaining_policy_enforce(struct chaining_policy_enforcer *enforcer, s /****************************************************************************** * STATIC ******************************************************************************/ - -static void time_echo(uint64_t session_id, char *info) -{ - time_t t; - time(&t); - TFE_LOG_ERROR(g_default_logger, "%s: session:%lu, time:%s %s", LOG_TAG_PKTIO, session_id, ctime(&t), info); -} - // return 0 : not keepalive packet // return 1 : is keepalive packet static int is_downstream_keepalive_packet(marsio_buff_t *rx_buff) @@ -595,8 +587,8 @@ static int tcp_restore_set_from_cmsg(struct tfe_cmsg *cmsg, struct tcp_restore_i ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (unsigned char *)&wsacle_server, sizeof(uint8_t), &length); if (ret == 0) { - restore_info->client.wscale_perm = true; - restore_info->client.wscale = wsacle_server; + restore_info->server.wscale_perm = true; + restore_info->server.wscale = wsacle_server; } uint8_t sack_client; @@ -889,6 +881,7 @@ static int packet_io_set_metadata(marsio_buff_t *tx_buff, struct metadata *meta) if (meta->is_ctrl_pkt) { + marsio_buff_set_ctrlbuf(tx_buff); if (marsio_buff_set_metadata(tx_buff, MR_BUFF_PAYLOAD_OFFSET, &(meta->l7offset), sizeof(meta->l7offset)) != 0) { TFE_LOG_ERROR(g_default_logger, "%s: unable to set l7offset for metadata", LOG_TAG_PKTIO); @@ -952,7 +945,7 @@ static void packet_io_dump_metadata(marsio_buff_t *tx_buff, struct metadata *met */ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; @@ -979,6 +972,7 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) mpack_writer_t writer; mpack_writer_init_growable(&writer, &data, &size); + // root map mpack_build_map(&writer); mpack_write_cstr(&writer, "tsync"); @@ -994,9 +988,11 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) mpack_write_cstr(&writer, "log_update"); mpack_write_cstr(&writer, "params"); + // params map mpack_build_map(&writer); mpack_write_cstr(&writer, "proxy"); + // proxy map mpack_build_map(&writer); mpack_write_cstr(&writer, "ssl_intercept_info"); @@ -1031,9 +1027,30 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) mpack_write_str(&writer, ssl_error, ssl_error_length); mpack_write_str(&writer, ssl_passthrough_reason, ssl_passthrough_reason_length); mpack_complete_array(&writer); + // proxy map end mpack_complete_map(&writer); + // params map end mpack_complete_map(&writer); + // root map end mpack_complete_map(&writer); + + // finish writing + if (mpack_writer_destroy(&writer) != mpack_ok) + { + assert(0); + if (data) + { + free(data); + data = NULL; + } + return; + } + + struct ethhdr *eth_hdr = (struct ethhdr *)s_ctx->ctrl_meta->raw_data; + struct ip *ip_hdr = (struct ip *)((char *)eth_hdr + sizeof(struct ethhdr)); + struct tcphdr *tcp_hdr = (struct tcphdr *)((char *)ip_hdr + sizeof(struct ip)); + // ip_hdr->ip_len = htons(sizeof(struct ip) + (ntohs(tcp_hdr->th_off) * 4) + size); + ip_hdr->ip_len = htons(sizeof(struct ip) + 20 + size); char *raw_packet_header_data = s_ctx->ctrl_meta->raw_data; int raw_packet_header_len = s_ctx->ctrl_meta->l7offset; @@ -1052,7 +1069,6 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) int nsend = marsio_buff_datalen(tx_buffs[0]); marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_seq, tx_buffs, 1); - mpack_writer_destroy(&writer); if (data) free(data); return; @@ -1083,7 +1099,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser struct sockaddr_in *in_addr_client = (struct sockaddr_in *)&restore_info.client.addr; struct sockaddr_in *in_addr_server = (struct sockaddr_in *)&restore_info.server.addr; - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct packet_io *packet_io = thread->ref_io; struct raw_pkt_parser raw_parser; @@ -1259,7 +1275,7 @@ end: // return -1 : error static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id); if (!node) @@ -1274,7 +1290,7 @@ static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser * // return -1 : error static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id); if (node) @@ -1292,14 +1308,14 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser // return -1 : error static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; TFE_LOG_ERROR(g_default_logger, "%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id); for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++) { - struct acceptor_thread_ctx *thread_ctx = &acceptor_ctx->work_threads[i]; + struct packet_io_thread_ctx *thread_ctx = &acceptor_ctx->work_threads[i]; __atomic_fetch_add(&thread_ctx->session_table_need_reset, 1, __ATOMIC_RELAXED); } @@ -1310,9 +1326,9 @@ static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser // return -1 : error static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; - struct global_metrics *g_metrics = thread->ref_metrics; + struct packet_io_fs *packet_io_fs = thread->ret_fs_state; struct metadata meta; if (packet_io_get_metadata(rx_buff, &meta) == -1) @@ -1340,21 +1356,21 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf switch (ctrl_parser.state) { case SESSION_STATE_OPENING: - __atomic_fetch_add(&g_metrics->ctrl_pkt_opening_num, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&packet_io_fs->ctrl_pkt_opening_num, 1, __ATOMIC_RELAXED); // when session opening, firewall not send policy id // return handle_session_opening(&meta, &ctrl_parser, thread_seq, ctx); break; case SESSION_STATE_CLOSING: - __atomic_fetch_add(&g_metrics->ctrl_pkt_closing_num, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&packet_io_fs->ctrl_pkt_closing_num, 1, __ATOMIC_RELAXED); return handle_session_closing(&meta, &ctrl_parser, thread_seq, ctx); case SESSION_STATE_ACTIVE: - __atomic_fetch_add(&g_metrics->ctrl_pkt_active_num, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&packet_io_fs->ctrl_pkt_active_num, 1, __ATOMIC_RELAXED); return handle_session_active(&meta, &ctrl_parser, thread_seq, ctx); case SESSION_STATE_RESETALL: - __atomic_fetch_add(&g_metrics->ctrl_pkt_resetall_num, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&packet_io_fs->ctrl_pkt_resetall_num, 1, __ATOMIC_RELAXED); return handle_session_resetall(&meta, &ctrl_parser, thread_seq, ctx); default: - __atomic_fetch_add(&g_metrics->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&packet_io_fs->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED); break; } @@ -1363,9 +1379,9 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; struct packet_io *packet_io = thread->ref_io; - struct global_metrics *g_metrics = thread->ref_metrics; + struct packet_io_fs *packet_io_fs = thread->ret_fs_state; int raw_len = marsio_buff_datalen(rx_buff); char *raw_data = marsio_buff_mtod(rx_buff); @@ -1380,7 +1396,6 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); return -1; } - time_echo(meta.session_id, "raw pkg from nf start"); struct session_node *node = session_table_search_by_id(thread->session_table, meta.session_id); if (node == NULL) @@ -1403,7 +1418,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx else { tfe_tap_write_per_thread(thread->tap_ctx->tap_s, raw_data, raw_len, g_default_logger); } - throughput_metrics_inc(&g_metrics->tap_s_pkt_tx, 1, raw_len); + throughput_metrics_inc(&packet_io_fs->tap_s_pkt_tx, 1, raw_len); } // s2c else { @@ -1414,7 +1429,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx else { tfe_tap_write_per_thread(thread->tap_ctx->tap_c, raw_data, raw_len, g_default_logger); } - throughput_metrics_inc(&g_metrics->tap_c_pkt_tx, 1, raw_len); + throughput_metrics_inc(&packet_io_fs->tap_c_pkt_tx, 1, raw_len); } } else @@ -1450,7 +1465,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx else { tfe_tap_write_per_thread(thread->tap_ctx->tap_fd, raw_data, raw_len, g_default_logger); } - throughput_metrics_inc(&g_metrics->tap_pkt_tx, 1, raw_len); + throughput_metrics_inc(&packet_io_fs->tap_pkt_tx, 1, raw_len); uint8_t flag = tfe_cmsg_get_flag(s_ctx->cmsg); if (flag & TFE_CMSG_FLAG_USER0) { @@ -1459,11 +1474,9 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx } } marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq); - time_echo(meta.session_id, "raw pkg from nf end"); return 0; } - /****************************************************************************** * EXTERN ******************************************************************************/ @@ -1495,7 +1508,7 @@ void tfe_tap_ctx_destory(struct tap_ctx *handler) struct tap_ctx *tfe_tap_ctx_create(void *ctx) { int ret = 0; - struct acceptor_thread_ctx *thread_ctx = (struct acceptor_thread_ctx *)ctx; + struct packet_io_thread_ctx *thread_ctx = (struct packet_io_thread_ctx *)ctx; struct acceptor_kni_v4 *acceptor_ctx = thread_ctx->ref_acceptor_ctx; struct packet_io *packet_io = acceptor_ctx->io; struct tap_ctx *tap_ctx = (struct tap_ctx *)calloc(1, sizeof(struct tap_ctx)); @@ -1553,7 +1566,7 @@ error_out: return NULL; } -int packet_io_thread_init(struct packet_io *handle, struct acceptor_thread_ctx *thread_ctx) +int packet_io_thread_init(struct packet_io *handle, struct packet_io_thread_ctx *thread_ctx) { if (marsio_thread_init(handle->instance) != 0) { @@ -1564,7 +1577,7 @@ int packet_io_thread_init(struct packet_io *handle, struct acceptor_thread_ctx * return 0; } -void packet_io_thread_wait(struct packet_io *handle, struct acceptor_thread_ctx *thread_ctx, int timeout_ms) +void packet_io_thread_wait(struct packet_io *handle, struct packet_io_thread_ctx *thread_ctx, int timeout_ms) { struct mr_vdev *vdevs[] = {handle->dev_nf_interface.mr_dev}; @@ -1679,8 +1692,8 @@ error_out: // return n_packet_recv int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx; - struct global_metrics *g_metrics = thread->ref_metrics; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx; + struct packet_io_fs *packet_io_fs = thread->ret_fs_state; marsio_buff_t *rx_buffs[RX_BURST_MAX]; @@ -1715,14 +1728,14 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi if (marsio_buff_is_ctrlbuf(rx_buff)) { - throughput_metrics_inc(&g_metrics->ctrl_pkt_rx, 1, raw_len); + throughput_metrics_inc(&packet_io_fs->ctrl_pkt_rx, 1, raw_len); // all control packet need bypass handle_control_packet(handle, rx_buff, thread_seq, ctx); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); } else { - throughput_metrics_inc(&g_metrics->raw_pkt_rx, 1, raw_len); + throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len); handle_raw_packet_from_nf(handle, rx_buff, thread_seq, ctx); } } @@ -1732,7 +1745,7 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi void handle_decryption_packet_from_tap(const char *data, int len, void *args) { - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)args; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)args; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; @@ -1752,8 +1765,6 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args) return; } struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; - time_echo(s_ctx->session_id, "decryption pkg from nf start"); - marsio_buff_t *tx_buffs[1]; int alloc_ret = marsio_buff_malloc_device(packet_io->dev_nf_interface.mr_dev, tx_buffs, 1, 0, thread->thread_index); if (alloc_ret < 0){ @@ -1791,14 +1802,13 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args) } packet_io_set_metadata(tx_buffs[0], &meta); marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1); - time_echo(s_ctx->session_id, "decryption pkg from nf end"); } void handle_raw_packet_from_tap(const char *data, int len, void *args) { char *src_mac = NULL; char *dst_mac = NULL; - struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)args; + struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)args; struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; @@ -1818,8 +1828,6 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args) return; } struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; - time_echo(s_ctx->session_id, "raw pkg from tap start"); - marsio_buff_t *tx_buffs[1]; int alloc_ret = marsio_buff_malloc_device(packet_io->dev_nf_interface.mr_dev, tx_buffs, 1, 0, thread->thread_index); if (alloc_ret < 0){ @@ -1867,5 +1875,4 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args) packet_io_set_metadata(tx_buffs[0], &meta); add_ether_header(dst, src_mac, dst_mac); marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1); - time_echo(s_ctx->session_id, "raw pkg from tap end"); } diff --git a/common/src/tfe_packet_io_fs.cpp b/common/src/tfe_packet_io_fs.cpp new file mode 100644 index 0000000..13469b2 --- /dev/null +++ b/common/src/tfe_packet_io_fs.cpp @@ -0,0 +1,205 @@ +#include +#include +#include +#include + +#include "tfe_proxy.h" +#include "tfe_packet_io_fs.h" + +enum PACKET_IO_STAT_FIELD +{ + // hit block policy + STAT_HIT_POLICY_PKT, + STAT_HIT_POLICY_B, + + // dev nf interface + STAT_RAW_PKT_RX_PKT, + STAT_RAW_PKT_RX_B, + + STAT_RAW_PKT_TX_PKT, + STAT_RAW_PKT_TX_B, + + STAT_DECRYPTED_TX_PKT, + STAT_DECRYPTED_TX_B, + STAT_DECRYPTED_RX_PKT, + STAT_DECRYPTED_RX_B, + + // control packet + STAT_CONTROL_RX_PKT, + STAT_CONTROL_RX_B, + STAT_CONTROL_TX_PKT, + STAT_CONTROL_TX_B, + + STAT_TAP_RX_PKT, + STAT_TAP_RX_B, + STAT_TAP_TX_PKT, + STAT_TAP_TX_B, + + STAT_TAP_C_RX_PKT, + STAT_TAP_C_RX_B, + STAT_TAP_C_TX_PKT, + STAT_TAP_C_TX_B, + + STAT_TAP_S_RX_PKT, + STAT_TAP_S_RX_B, + STAT_TAP_S_TX_PKT, + STAT_TAP_S_TX_B, + + STAT_CTRL_PKT_OPENING, + STAT_CTRL_PKT_ACTIVE, + STAT_CTRL_PKT_CLOSING, + STAT_CTRL_PKT_RESETALL, + STAT_CTRL_PKT_ERROR, + + // send log + STAT_SEND_LOG, + + // max + STAT_MAX, +}; + +static const char *stat_map[] = +{ + // hit policy + [STAT_HIT_POLICY_PKT] = "hit_policy_pkt", + [STAT_HIT_POLICY_B] = "hit_policy_B", + + // dev nf interface + [STAT_RAW_PKT_RX_PKT] = "raw_rx_pkt", + [STAT_RAW_PKT_RX_B] = "raw_rx_B", + + [STAT_RAW_PKT_TX_PKT] = "raw_tx_pkt", + [STAT_RAW_PKT_TX_B] = "raw_tx_B", + + // decrypted + [STAT_DECRYPTED_TX_PKT] = "decrypt_tx_pkt", + [STAT_DECRYPTED_TX_B] = "decrypt_tx_B", + [STAT_DECRYPTED_RX_PKT] = "decrypt_rx_pkt", + [STAT_DECRYPTED_RX_B] = "decrypt_rx_B", + + // control packet + [STAT_CONTROL_RX_PKT] = "ctrl_rx_pkt", + [STAT_CONTROL_RX_B] = "ctrl_rx_B", + [STAT_CONTROL_TX_PKT] = "ctrl_tx_pkt", + [STAT_CONTROL_TX_B] = "ctrl_tx_B", + + // tap packet + [STAT_TAP_RX_PKT] = "tap_rx_pkt", + [STAT_TAP_RX_B] = "tap_rx_B", + [STAT_TAP_TX_PKT] = "tap_tx_pkt", + [STAT_TAP_TX_B] = "tap_tx_B", + [STAT_TAP_C_RX_PKT] = "tap_c_rx_pkt", + [STAT_TAP_C_RX_B] = "tap_c_rx_B", + [STAT_TAP_C_TX_PKT] = "tap_c_tx_pkt", + [STAT_TAP_C_TX_B] = "tap_c_tx_B", + [STAT_TAP_S_RX_PKT] = "tap_s_rx_pkt", + [STAT_TAP_S_RX_B] = "tap_s_rx_B", + [STAT_TAP_S_TX_PKT] = "tap_s_tx_pkt", + [STAT_TAP_S_TX_B] = "tap_s_tx_B", + + [STAT_CTRL_PKT_OPENING] = "ctrl_pkt_open", + [STAT_CTRL_PKT_ACTIVE] = "ctrl_pkt_avtive", + [STAT_CTRL_PKT_CLOSING] = "ctrl_pkt_close", + [STAT_CTRL_PKT_RESETALL] = "ctrl_pkt_reset", + [STAT_CTRL_PKT_ERROR] = "ctrl_pkt_error", + + // send log + [STAT_SEND_LOG] = "send_log", + + [STAT_MAX] = NULL +}; + +/****************************************************************************** + * throughput_metrics + ******************************************************************************/ + +void throughput_metrics_inc(struct throughput_metrics *iterm, uint64_t n_pkts, uint64_t n_bytes) +{ + __atomic_fetch_add(&iterm->n_bytes, n_bytes, __ATOMIC_RELAXED); + __atomic_fetch_add(&iterm->n_pkts, n_pkts, __ATOMIC_RELAXED); +} + +struct packet_io_fs *packet_io_fs_create() +{ + int value=0, i=0; + const char* app_name="packet_io"; + const char* fieldstat_output="log/packet_io.fs2"; + + struct packet_io_fs *handle = (struct packet_io_fs *)calloc(1, sizeof(struct packet_io_fs)); + + handle->fs_handle=FS_create_handle(); + FS_set_para(handle->fs_handle, OUTPUT_DEVICE, fieldstat_output, strlen(fieldstat_output)+1); + value=1; + FS_set_para(handle->fs_handle, PRINT_MODE, &value, sizeof(value)); + value=0; + FS_set_para(handle->fs_handle, CREATE_THREAD, &value, sizeof(value)); + FS_set_para(handle->fs_handle, APP_NAME, app_name, strlen(app_name)+1); + + for (int i = 0; i < STAT_MAX; i++) + { + handle->fs_id[i] = FS_register(handle->fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, stat_map[i]); + } + FS_start(handle->fs_handle); + return handle; +} + +void packet_io_fs_destory(struct packet_io_fs *handle) +{ + if (handle) + { + FS_library_destroy(); + free(handle); + handle = NULL; + } +} + +void packet_io_fs_dump(struct packet_io_fs *handle) +{ + if (handle == NULL) + return; + FS_operate(handle->fs_handle, handle->fs_id[STAT_HIT_POLICY_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(handle->hit_policy.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_HIT_POLICY_B], 0, FS_OP_SET, __atomic_fetch_add(&(handle->hit_policy.n_bytes), 0, __ATOMIC_RELAXED)); + + FS_operate(handle->fs_handle, handle->fs_id[STAT_RAW_PKT_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(handle->raw_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_RAW_PKT_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(handle->raw_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); + + FS_operate(handle->fs_handle, handle->fs_id[STAT_RAW_PKT_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(handle->raw_pkt_tx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_RAW_PKT_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(handle->raw_pkt_tx.n_bytes), 0, __ATOMIC_RELAXED)); + + + FS_operate(handle->fs_handle, handle->fs_id[STAT_DECRYPTED_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(handle->decrypt_tx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_DECRYPTED_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(handle->decrypt_tx.n_bytes), 0, __ATOMIC_RELAXED)); + + FS_operate(handle->fs_handle, handle->fs_id[STAT_DECRYPTED_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(handle->decrypt_rx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_DECRYPTED_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(handle->decrypt_rx.n_bytes), 0, __ATOMIC_RELAXED)); + + // control packet + FS_operate(handle->fs_handle, handle->fs_id[STAT_CONTROL_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(handle->ctrl_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_CONTROL_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(handle->ctrl_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_CONTROL_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(handle->ctrl_pkt_tx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_CONTROL_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(handle->ctrl_pkt_tx.n_bytes), 0, __ATOMIC_RELAXED)); + + FS_operate(handle->fs_handle, handle->fs_id[STAT_TAP_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(handle->tap_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_TAP_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(handle->tap_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_TAP_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(handle->tap_pkt_tx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_TAP_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(handle->tap_pkt_tx.n_bytes), 0, __ATOMIC_RELAXED)); + + FS_operate(handle->fs_handle, handle->fs_id[STAT_TAP_C_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(handle->tap_c_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_TAP_C_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(handle->tap_c_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_TAP_C_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(handle->tap_c_pkt_tx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_TAP_C_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(handle->tap_c_pkt_tx.n_bytes), 0, __ATOMIC_RELAXED)); + + FS_operate(handle->fs_handle, handle->fs_id[STAT_TAP_S_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(handle->tap_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_TAP_S_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(handle->tap_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_TAP_S_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(handle->tap_pkt_tx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_TAP_S_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(handle->tap_pkt_tx.n_bytes), 0, __ATOMIC_RELAXED)); + + FS_operate(handle->fs_handle, handle->fs_id[STAT_CTRL_PKT_OPENING], 0, FS_OP_SET, __atomic_fetch_add(&(handle->ctrl_pkt_opening_num), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_CTRL_PKT_ACTIVE], 0, FS_OP_SET, __atomic_fetch_add(&(handle->ctrl_pkt_active_num), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_CTRL_PKT_CLOSING], 0, FS_OP_SET, __atomic_fetch_add(&(handle->ctrl_pkt_closing_num), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_CTRL_PKT_RESETALL], 0, FS_OP_SET, __atomic_fetch_add(&(handle->ctrl_pkt_resetall_num), 0, __ATOMIC_RELAXED)); + FS_operate(handle->fs_handle, handle->fs_id[STAT_CTRL_PKT_ERROR], 0, FS_OP_SET, __atomic_fetch_add(&(handle->ctrl_pkt_error_num), 0, __ATOMIC_RELAXED)); + + // send log + FS_operate(handle->fs_handle, handle->fs_id[STAT_SEND_LOG], 0, FS_OP_SET, __atomic_fetch_add(&(handle->send_log), 0, __ATOMIC_RELAXED)); +} diff --git a/common/src/tfe_utils.cpp b/common/src/tfe_utils.cpp index 0ae3492..d5c85c2 100644 --- a/common/src/tfe_utils.cpp +++ b/common/src/tfe_utils.cpp @@ -298,54 +298,6 @@ static int checksum(u_int16_t *addr, int len) return sum; } -void build_udp_header(const char *l3_hdr, int l3_hdr_len, struct udp_hdr *udp_hdr, u_int16_t udp_sport, u_int16_t udp_dport, int payload_len) -{ - memset(udp_hdr, 0, sizeof(struct udp_hdr)); - - int udp_hlen = sizeof(struct udp_hdr) + payload_len; - - udp_hdr->uh_sport = htons(udp_sport); - udp_hdr->uh_dport = htons(udp_dport); - - udp_hdr->uh_ulen = htons(udp_hlen); - udp_hdr->uh_sum = 0; - - int sum = checksum((u_int16_t *)l3_hdr, l3_hdr_len); - sum += ntohs(IPPROTO_UDP + udp_hlen); - sum += checksum((u_int16_t *)udp_hdr, udp_hlen); - udp_hdr->uh_sum = CHECKSUM_CARRY(sum); -} - -void build_ip_header(struct ip *ip_hdr, u_int8_t next_protocol, const char *src_addr, const char *dst_addr, uint16_t payload_len) -{ - memset(ip_hdr, 0, sizeof(struct ip)); - - ip_hdr->ip_hl = 5; /* 20 byte header */ - ip_hdr->ip_v = 4; /* version 4 */ - ip_hdr->ip_tos = 0; /* IP tos */ - ip_hdr->ip_id = htons(random()); /* IP ID */ - ip_hdr->ip_ttl = 80; /* time to live */ - ip_hdr->ip_p = next_protocol; /* transport protocol */ - ip_hdr->ip_src.s_addr = inet_addr(src_addr); - ip_hdr->ip_dst.s_addr = inet_addr(dst_addr); - ip_hdr->ip_len = htons(sizeof(struct ip) + payload_len); /* total length */ - ip_hdr->ip_off = htons(0); /* fragmentation flags */ - ip_hdr->ip_sum = 0; /* do this later */ - - int sum = checksum((u_int16_t *)ip_hdr, 20); - ip_hdr->ip_sum = CHECKSUM_CARRY(sum); -} - -// l3_protocol: ETH_P_IPV6/ETH_P_IP -void build_ether_header(struct ethhdr *eth_hdr, uint16_t next_protocol, const char *src_mac, const char *dst_mac) -{ - memset(eth_hdr, 0, sizeof(struct ethhdr)); - - str_to_mac(src_mac, (char *)eth_hdr->h_source); - str_to_mac(dst_mac, (char *)eth_hdr->h_dest); - eth_hdr->h_proto = htons(next_protocol); -} - int str_to_mac(const char *str, char *mac_buff) { if (sscanf(str, "%02hhx:%02hhx:%02hhx:%02hhx:%02hhx:%02hhx", &(mac_buff[0]), &(mac_buff[1]), &(mac_buff[2]), &(mac_buff[3]), &(mac_buff[4]), &(mac_buff[5])) == 6) @@ -382,12 +334,3 @@ int get_mac_by_device_name(const char *dev_name, char *mac_buff) return 0; } -/****************************************************************************** - * throughput_metrics - ******************************************************************************/ - -void throughput_metrics_inc(struct throughput_metrics *iterm, uint64_t n_pkts, uint64_t n_bytes) -{ - __atomic_fetch_add(&iterm->n_bytes, n_bytes, __ATOMIC_RELAXED); - __atomic_fetch_add(&iterm->n_pkts, n_pkts, __ATOMIC_RELAXED); -} diff --git a/platform/src/acceptor_kni_v4.cpp b/platform/src/acceptor_kni_v4.cpp index ac83cc6..080760c 100644 --- a/platform/src/acceptor_kni_v4.cpp +++ b/platform/src/acceptor_kni_v4.cpp @@ -13,7 +13,7 @@ #include #include #include "io_uring.h" -#include "tfe_metrics.h" +#include "tfe_packet_io_fs.h" #include "tfe_tcp_restore.h" #include "acceptor_kni_v4.h" #include "tap.h" @@ -40,7 +40,7 @@ void acceptor_ctx_destory(struct acceptor_kni_v4 * ctx) if (ctx) { packet_io_destory(ctx->io); - global_metrics_destory(ctx->metrics); + packet_io_fs_destory(ctx->packet_io_fs); free(ctx); ctx = NULL; @@ -72,8 +72,8 @@ struct acceptor_kni_v4 *acceptor_ctx_create(const char *profile) goto error_out; } - ctx->metrics = global_metrics_create(); - if (ctx->metrics == NULL) + ctx->packet_io_fs = packet_io_fs_create(); + if (ctx->packet_io_fs == NULL) { goto error_out; } @@ -87,7 +87,7 @@ error_out: static void *worker_thread_cycle(void *arg) { - struct acceptor_thread_ctx *thread_ctx = (struct acceptor_thread_ctx *)arg; + struct packet_io_thread_ctx *thread_ctx = (struct packet_io_thread_ctx *)arg; struct packet_io *handle = thread_ctx->ref_io; int pkg_len = 0; @@ -190,13 +190,13 @@ struct acceptor_kni_v4 *acceptor_kni_v4_create(struct tfe_proxy *proxy, const ch acceptor_ctx->work_threads[i].session_table = session_table_create(); acceptor_ctx->work_threads[i].ref_io = acceptor_ctx->io; acceptor_ctx->work_threads[i].ref_proxy = proxy; - acceptor_ctx->work_threads[i].ref_metrics = acceptor_ctx->metrics; + acceptor_ctx->work_threads[i].ret_fs_state = acceptor_ctx->packet_io_fs; acceptor_ctx->work_threads[i].ref_acceptor_ctx = acceptor_ctx; acceptor_ctx->work_threads[i].session_table_need_reset = 0; } for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++) { - struct acceptor_thread_ctx *thread_ctx = &acceptor_ctx->work_threads[i]; + struct packet_io_thread_ctx *thread_ctx = &acceptor_ctx->work_threads[i]; if (pthread_create(&thread_ctx->tid, NULL, worker_thread_cycle, (void *)thread_ctx) < 0) { goto error_out; diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 0f159d4..aafd246 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -40,6 +40,8 @@ #include #include #include +#include +#include #include #include @@ -53,8 +55,6 @@ #include #include -#include "tfe_metrics.h" - /* Breakpad */ #include @@ -292,7 +292,8 @@ static void __gc_handler_cb(evutil_socket_t fd, short what, void * arg) FS_operate(ctx->fs_handle, ctx->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(ctx->stat_val[i]))); } - // global_metrics_dump(ctx->kni_v4_acceptor->acceptor->metrics); + if (ctx->kni_v4_acceptor != NULL) + packet_io_fs_dump(ctx->kni_v4_acceptor->packet_io_fs); FS_passive_output(ctx->fs_handle); return; diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index ab6494d..7df5883 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -1533,6 +1533,7 @@ void tfe_stream_destory(struct tfe_stream_private * stream) if (stream->cmsg) { + tfe_cmsg_set_flag(stream->cmsg, TFE_CMSG_FLAG_USER0); tfe_cmsg_destroy(stream->cmsg); }