From 679ee1be9cd5be1a5c1043ee8e8186107290b789 Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Thu, 23 Feb 2023 18:15:21 +0800 Subject: [PATCH] =?UTF-8?q?fieldstat=E5=A2=9E=E5=8A=A0=E5=AF=B9=E6=8E=A7?= =?UTF-8?q?=E5=88=B6=E6=8A=A5=E6=96=87=E7=B1=BB=E5=9E=8B=E7=9A=84=E8=AE=A1?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/include/ctrl_packet.h | 3 +- common/src/ctrl_packet.cpp | 6 +- platform/include/global_metrics.h | 17 +++++- platform/src/global_metrics.cpp | 94 +++++++++++++++++++++++-------- platform/src/packet_io.cpp | 26 +++++++-- 5 files changed, 112 insertions(+), 34 deletions(-) diff --git a/common/include/ctrl_packet.h b/common/include/ctrl_packet.h index 09654fc..7ffe43e 100644 --- a/common/include/ctrl_packet.h +++ b/common/include/ctrl_packet.h @@ -11,7 +11,7 @@ extern "C" enum session_state { SESSION_STATE_OPENING = 1, - SESSION_STATE_CLONING = 2, + SESSION_STATE_CLOSING = 2, SESSION_STATE_ACTIVE = 3, SESSION_STATE_RESETALL = 4, }; @@ -26,6 +26,7 @@ struct ctrl_pkt_parser int policy_id_num; }; +const char *session_state_to_string(enum session_state state); void ctrl_packet_parser_init(struct ctrl_pkt_parser *handler); // return 0 : success diff --git a/common/src/ctrl_packet.cpp b/common/src/ctrl_packet.cpp index bd78da8..cf201e6 100644 --- a/common/src/ctrl_packet.cpp +++ b/common/src/ctrl_packet.cpp @@ -6,13 +6,13 @@ #include "utils.h" #include "ctrl_packet.h" -static const char *session_state_to_string(enum session_state state) +const char *session_state_to_string(enum session_state state) { switch (state) { case SESSION_STATE_OPENING: return "opening"; - case SESSION_STATE_CLONING: + case SESSION_STATE_CLOSING: return "closing"; case SESSION_STATE_ACTIVE: return "active"; @@ -80,7 +80,7 @@ int ctrl_packet_parser_parse(struct ctrl_pkt_parser *handler, const char *data, } else if (strcasecmp(item->valuestring, "closing") == 0) { - handler->state = SESSION_STATE_CLONING; + handler->state = SESSION_STATE_CLOSING; } else if (strcasecmp(item->valuestring, "resetall") == 0) { diff --git a/platform/include/global_metrics.h b/platform/include/global_metrics.h index 6075515..cc76336 100644 --- a/platform/include/global_metrics.h +++ b/platform/include/global_metrics.h @@ -33,8 +33,21 @@ struct global_metrics struct throughput_metrics hit_block_policy; // 累计值 struct throughput_metrics hit_bypass_policy; // 累计值 - - struct throughput_metrics control_packet_rx; // 累计值 + + struct throughput_metrics steering_tx; // 累计值 + struct throughput_metrics steering_rx; // 累计值 + + struct throughput_metrics mirroring_tx; // 累计值 + struct throughput_metrics mirroring_rx_drop; // 累计值 + + struct throughput_metrics keepalive_packet_rx; // 累计值 + struct throughput_metrics control_packet_rx; // 累计值 + + uint64_t control_packet_opening_num; // 累计值 + uint64_t control_packet_active_num; // 累计值 + uint64_t control_packet_closing_num; // 累计值 + uint64_t control_packet_resetall_num; // 累计值 + uint64_t control_packet_error_num; // 累计值 uint64_t session_nums; // 瞬时值 diff --git a/platform/src/global_metrics.cpp b/platform/src/global_metrics.cpp index 080f041..3d52133 100644 --- a/platform/src/global_metrics.cpp +++ b/platform/src/global_metrics.cpp @@ -36,13 +36,35 @@ enum SCE_STAT_FIELD STAT_HIT_BYPASS_POLICY_PKT, STAT_HIT_BYPASS_POLICY_B, + // steering + STAT_STEERING_TX_PKT, + STAT_STEERING_TX_B, + STAT_STEERING_RX_PKT, + STAT_STEERING_RX_B, + + // mirroring + STAT_MIRRORING_TX_PKT, + STAT_MIRRORING_TX_B, + STAT_MIRRORING_RX_DROP_PKT, + STAT_MIRRORING_RX_DROP_B, + // control packet STAT_CONTROL_RX_PKT, STAT_CONTROL_RX_B, + STAT_CONTROL_PKT_OPENING, + STAT_CONTROL_PKT_ACTIVE, + STAT_CONTROL_PKT_CLOSING, + STAT_CONTROL_PKT_RESETALL, + STAT_CONTROL_PKT_ERROR, + // current session number STAT_CURRENT_SESSION_NUMS, + // keepalive packet + STAT_KEEPALIVE_RX_PKT, + STAT_KEEPALIVE_RX_B, + // max STAT_MAX, }; @@ -77,13 +99,35 @@ static const char *stat_map[] = [STAT_HIT_BYPASS_POLICY_PKT] = "hit_bypass_pkt", [STAT_HIT_BYPASS_POLICY_B] = "hit_bypass_B", + // steering + [STAT_STEERING_TX_PKT] = "stee_tx_pkt", + [STAT_STEERING_TX_B] = "stee_tx_B", + [STAT_STEERING_RX_PKT] = "stee_rx_pkt", + [STAT_STEERING_RX_B] = "stee_rx_B", + + // mirroring + [STAT_MIRRORING_TX_PKT] = "mirr_tx_pkt", + [STAT_MIRRORING_TX_B] = "mirr_tx_B", + [STAT_MIRRORING_RX_DROP_PKT] = "mirr_rx_drop_pkt", + [STAT_MIRRORING_RX_DROP_B] = "mirro_rx_drop_B", + // control packet [STAT_CONTROL_RX_PKT] = "ctrl_rx_pkt", [STAT_CONTROL_RX_B] = "ctrl_rx_B", + [STAT_CONTROL_PKT_OPENING] = "ctrl_pkt_open", + [STAT_CONTROL_PKT_ACTIVE] = "ctrl_pkt_avtive", + [STAT_CONTROL_PKT_CLOSING] = "ctrl_pkt_close", + [STAT_CONTROL_PKT_RESETALL] = "ctrl_pkt_reset", + [STAT_CONTROL_PKT_ERROR] = "ctrl_pkt_error", + // current session number [STAT_CURRENT_SESSION_NUMS] = "curr_sess_num", + // keepalive packet + [STAT_KEEPALIVE_RX_PKT] = "kepalive_rx_pkt", + [STAT_KEEPALIVE_RX_B] = "kepalive_rx_B", + [STAT_MAX] = NULL}; static void global_metrics_parse_config(const char *profile, struct global_metrics_config *config) @@ -162,24 +206,6 @@ void global_metrics_destory(struct global_metrics *metrics) void global_metrics_dump(struct global_metrics *metrics) { - if (strlen(metrics->config.statsd_server) == 0) - { - LOG_INFO("%s: dev_endpoint_rx : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_endpoint_rx.n_pkts, metrics->dev_endpoint_rx.n_bytes); - LOG_INFO("%s: dev_endpoint_tx : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_endpoint_tx.n_pkts, metrics->dev_endpoint_tx.n_bytes); - LOG_INFO("%s: dev_endpoint_err_drop : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_endpoint_err_drop.n_pkts, metrics->dev_endpoint_err_drop.n_bytes); - - LOG_INFO("%s: dev_nf_interface_rx : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_nf_interface_rx.n_pkts, metrics->dev_nf_interface_rx.n_bytes); - LOG_INFO("%s: dev_nf_interface_tx : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_nf_interface_tx.n_pkts, metrics->dev_nf_interface_tx.n_bytes); - LOG_INFO("%s: dev_nf_interface_err_bypass : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_nf_interface_err_bypass.n_pkts, metrics->dev_nf_interface_err_bypass.n_bytes); - - LOG_INFO("%s: hit_block_policy : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->hit_block_policy.n_pkts, metrics->hit_block_policy.n_bytes); - LOG_INFO("%s: hit_bypass_policy : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->hit_bypass_policy.n_pkts, metrics->hit_bypass_policy.n_bytes); - - LOG_INFO("%s: control_packet : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->control_packet_rx.n_pkts, metrics->control_packet_rx.n_bytes); - - LOG_INFO("%s: current_session_num : %6lu", LOG_TAG_METRICS, metrics->session_nums); - } - // dev endpoint FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_rx.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_rx.n_bytes), 0, __ATOMIC_RELAXED)); @@ -190,10 +216,6 @@ void global_metrics_dump(struct global_metrics *metrics) FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_ERR_DROP_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_err_drop.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_ERR_DROP_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_err_drop.n_bytes), 0, __ATOMIC_RELAXED)); - // hit policy - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BLOCK_POLICY_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_block_policy.n_pkts), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BLOCK_POLICY_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_block_policy.n_bytes), 0, __ATOMIC_RELAXED)); - // dev nf interface FS_operate(metrics->fs_handle, metrics->fs_id[STAT_NF_INTERFACE_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_nf_interface_rx.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_NF_INTERFACE_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_nf_interface_rx.n_bytes), 0, __ATOMIC_RELAXED)); @@ -204,14 +226,42 @@ void global_metrics_dump(struct global_metrics *metrics) FS_operate(metrics->fs_handle, metrics->fs_id[STAT_NF_INTERFACE_ERR_BYPASS_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_nf_interface_err_bypass.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_NF_INTERFACE_ERR_BYPASS_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_nf_interface_err_bypass.n_bytes), 0, __ATOMIC_RELAXED)); + // hit block policy + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BLOCK_POLICY_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_block_policy.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BLOCK_POLICY_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_block_policy.n_bytes), 0, __ATOMIC_RELAXED)); + // hit bypass policy FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BYPASS_POLICY_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_bypass_policy.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BYPASS_POLICY_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_bypass_policy.n_bytes), 0, __ATOMIC_RELAXED)); + // steering + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_STEERING_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->steering_tx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_STEERING_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->steering_tx.n_bytes), 0, __ATOMIC_RELAXED)); + + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_STEERING_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->steering_rx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_STEERING_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->steering_rx.n_bytes), 0, __ATOMIC_RELAXED)); + + // mirroring + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_MIRRORING_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->mirroring_tx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_MIRRORING_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->mirroring_tx.n_bytes), 0, __ATOMIC_RELAXED)); + + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_MIRRORING_RX_DROP_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->mirroring_rx_drop.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_MIRRORING_RX_DROP_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->mirroring_rx_drop.n_bytes), 0, __ATOMIC_RELAXED)); + + // keepalive packet + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEEPALIVE_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->keepalive_packet_rx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEEPALIVE_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->keepalive_packet_rx.n_bytes), 0, __ATOMIC_RELAXED)); + // control packet FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_rx.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_rx.n_bytes), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_PKT_OPENING], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_opening_num), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_PKT_ACTIVE], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_active_num), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_PKT_CLOSING], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_closing_num), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_PKT_RESETALL], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_resetall_num), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_PKT_ERROR], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_error_num), 0, __ATOMIC_RELAXED)); + // current session number FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CURRENT_SESSION_NUMS], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->session_nums), 0, __ATOMIC_RELAXED)); diff --git a/platform/src/packet_io.cpp b/platform/src/packet_io.cpp index c5b21f1..c286185 100644 --- a/platform/src/packet_io.cpp +++ b/platform/src/packet_io.cpp @@ -556,7 +556,7 @@ static int packet_io_get_metadata(marsio_buff_t *rx_buff, struct metadata *meta) } meta->sids.num = marsio_buff_get_sid_list(rx_buff, meta->sids.elems, sizeof(meta->sids.elems) / sizeof(meta->sids.elems[0])); - if (meta->sids.num <= 0) + if (meta->sids.num < 0) { LOG_ERROR("%s: unable to get sid_list from metadata", LOG_TAG_PKTIO); return -1; @@ -618,7 +618,7 @@ static int packet_io_set_metadata(marsio_buff_t *tx_buff, struct metadata *meta) } } - if (meta->sids.num) + if (meta->sids.num > 0) { if (marsio_buff_set_sid_list(tx_buff, meta->sids.elems, meta->sids.num) != 0) { @@ -639,11 +639,15 @@ static void packet_io_dump_metadata(marsio_buff_t *tx_buff, struct metadata *met // return -1 : error static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx) { + struct thread_ctx *thread = (struct thread_ctx *)ctx; + struct global_metrics *g_metrics = thread->ref_metrics; + struct metadata meta; if (packet_io_get_metadata(rx_buff, &meta) == -1) { LOG_ERROR("%s: unexpected control packet, unable to get metadata", LOG_TAG_PKTIO); packet_io_dump_metadata(rx_buff, &meta); + __atomic_fetch_add(&g_metrics->control_packet_error_num, 1, __ATOMIC_RELAXED); return -1; } @@ -652,27 +656,37 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf if (ctrl_packet_parser_parse(&ctrl_parser, meta.raw_data + meta.l7_offset, meta.raw_len - meta.l7_offset) == -1) { LOG_ERROR("%s: unexpected control packet, unable to parse data", LOG_TAG_PKTIO); + __atomic_fetch_add(&g_metrics->control_packet_error_num, 1, __ATOMIC_RELAXED); return -1; } if (ctrl_parser.session_id != meta.session_id) { LOG_ERROR("%s: unexpected control packet, metadata's session %lu != control packet's session %lu", LOG_TAG_PKTIO, meta.session_id, ctrl_parser.session_id); + __atomic_fetch_add(&g_metrics->control_packet_error_num, 1, __ATOMIC_RELAXED); return -1; } + LOG_INFO("%s: recv control packet, session %lu %s", LOG_TAG_PKTIO, ctrl_parser.session_id, session_state_to_string(ctrl_parser.state)); + switch (ctrl_parser.state) { case SESSION_STATE_OPENING: + __atomic_fetch_add(&g_metrics->control_packet_opening_num, 1, __ATOMIC_RELAXED); // when session opening, firewall not send policy id // return handle_session_opening(&meta, &ctrl_parser, thread_seq, ctx); break; - case SESSION_STATE_CLONING: + case SESSION_STATE_CLOSING: + __atomic_fetch_add(&g_metrics->control_packet_closing_num, 1, __ATOMIC_RELAXED); return handle_session_closing(&meta, &ctrl_parser, thread_seq, ctx); case SESSION_STATE_ACTIVE: + __atomic_fetch_add(&g_metrics->control_packet_active_num, 1, __ATOMIC_RELAXED); return handle_session_active(&meta, &ctrl_parser, thread_seq, ctx); case SESSION_STATE_RESETALL: + __atomic_fetch_add(&g_metrics->control_packet_resetall_num, 1, __ATOMIC_RELAXED); return handle_session_resetall(&meta, &ctrl_parser, thread_seq, ctx); + default: + __atomic_fetch_add(&g_metrics->control_packet_error_num, 1, __ATOMIC_RELAXED); } return 0; @@ -1201,7 +1215,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser s_ctx->first_ctrl_pkt.header_len = meta->l7_offset; s_ctx->chaining = selected_chaining_create(128); - LOG_INFO("%s: session %lu %s opening", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string); + LOG_INFO("%s: session %lu %s active first", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string); for (int i = 0; i < parser->policy_id_num; i++) { @@ -1266,7 +1280,7 @@ static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser * } struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; - LOG_INFO("%s: session %lu %s update", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string); + LOG_INFO("%s: session %lu %s active again", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string); for (int i = 0; i < parser->policy_id_num; i++) { @@ -1299,7 +1313,7 @@ static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser struct global_metrics *g_metrics = thread->ref_metrics; struct sce_ctx *sce_ctx = thread->ref_sce_ctx; - LOG_ERROR("%s: session %lu notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id); + LOG_ERROR("%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id); __atomic_fetch_and(&g_metrics->session_nums, 0, __ATOMIC_RELAXED);