fieldstat增加对控制报文类型的计数

This commit is contained in:
luwenpeng
2023-02-23 18:15:21 +08:00
parent ecb9241ce9
commit 679ee1be9c
5 changed files with 112 additions and 34 deletions

View File

@@ -11,7 +11,7 @@ extern "C"
enum session_state
{
SESSION_STATE_OPENING = 1,
SESSION_STATE_CLONING = 2,
SESSION_STATE_CLOSING = 2,
SESSION_STATE_ACTIVE = 3,
SESSION_STATE_RESETALL = 4,
};
@@ -26,6 +26,7 @@ struct ctrl_pkt_parser
int policy_id_num;
};
const char *session_state_to_string(enum session_state state);
void ctrl_packet_parser_init(struct ctrl_pkt_parser *handler);
// return 0 : success

View File

@@ -6,13 +6,13 @@
#include "utils.h"
#include "ctrl_packet.h"
static const char *session_state_to_string(enum session_state state)
const char *session_state_to_string(enum session_state state)
{
switch (state)
{
case SESSION_STATE_OPENING:
return "opening";
case SESSION_STATE_CLONING:
case SESSION_STATE_CLOSING:
return "closing";
case SESSION_STATE_ACTIVE:
return "active";
@@ -80,7 +80,7 @@ int ctrl_packet_parser_parse(struct ctrl_pkt_parser *handler, const char *data,
}
else if (strcasecmp(item->valuestring, "closing") == 0)
{
handler->state = SESSION_STATE_CLONING;
handler->state = SESSION_STATE_CLOSING;
}
else if (strcasecmp(item->valuestring, "resetall") == 0)
{

View File

@@ -34,8 +34,21 @@ struct global_metrics
struct throughput_metrics hit_block_policy; // 累计值
struct throughput_metrics hit_bypass_policy; // 累计值
struct throughput_metrics steering_tx; // 累计值
struct throughput_metrics steering_rx; // 累计值
struct throughput_metrics mirroring_tx; // 累计值
struct throughput_metrics mirroring_rx_drop; // 累计值
struct throughput_metrics keepalive_packet_rx; // 累计值
struct throughput_metrics control_packet_rx; // 累计值
uint64_t control_packet_opening_num; // 累计值
uint64_t control_packet_active_num; // 累计值
uint64_t control_packet_closing_num; // 累计值
uint64_t control_packet_resetall_num; // 累计值
uint64_t control_packet_error_num; // 累计值
uint64_t session_nums; // 瞬时值
struct global_metrics_config config;

View File

@@ -36,13 +36,35 @@ enum SCE_STAT_FIELD
STAT_HIT_BYPASS_POLICY_PKT,
STAT_HIT_BYPASS_POLICY_B,
// steering
STAT_STEERING_TX_PKT,
STAT_STEERING_TX_B,
STAT_STEERING_RX_PKT,
STAT_STEERING_RX_B,
// mirroring
STAT_MIRRORING_TX_PKT,
STAT_MIRRORING_TX_B,
STAT_MIRRORING_RX_DROP_PKT,
STAT_MIRRORING_RX_DROP_B,
// control packet
STAT_CONTROL_RX_PKT,
STAT_CONTROL_RX_B,
STAT_CONTROL_PKT_OPENING,
STAT_CONTROL_PKT_ACTIVE,
STAT_CONTROL_PKT_CLOSING,
STAT_CONTROL_PKT_RESETALL,
STAT_CONTROL_PKT_ERROR,
// current session number
STAT_CURRENT_SESSION_NUMS,
// keepalive packet
STAT_KEEPALIVE_RX_PKT,
STAT_KEEPALIVE_RX_B,
// max
STAT_MAX,
};
@@ -77,13 +99,35 @@ static const char *stat_map[] =
[STAT_HIT_BYPASS_POLICY_PKT] = "hit_bypass_pkt",
[STAT_HIT_BYPASS_POLICY_B] = "hit_bypass_B",
// steering
[STAT_STEERING_TX_PKT] = "stee_tx_pkt",
[STAT_STEERING_TX_B] = "stee_tx_B",
[STAT_STEERING_RX_PKT] = "stee_rx_pkt",
[STAT_STEERING_RX_B] = "stee_rx_B",
// mirroring
[STAT_MIRRORING_TX_PKT] = "mirr_tx_pkt",
[STAT_MIRRORING_TX_B] = "mirr_tx_B",
[STAT_MIRRORING_RX_DROP_PKT] = "mirr_rx_drop_pkt",
[STAT_MIRRORING_RX_DROP_B] = "mirro_rx_drop_B",
// control packet
[STAT_CONTROL_RX_PKT] = "ctrl_rx_pkt",
[STAT_CONTROL_RX_B] = "ctrl_rx_B",
[STAT_CONTROL_PKT_OPENING] = "ctrl_pkt_open",
[STAT_CONTROL_PKT_ACTIVE] = "ctrl_pkt_avtive",
[STAT_CONTROL_PKT_CLOSING] = "ctrl_pkt_close",
[STAT_CONTROL_PKT_RESETALL] = "ctrl_pkt_reset",
[STAT_CONTROL_PKT_ERROR] = "ctrl_pkt_error",
// current session number
[STAT_CURRENT_SESSION_NUMS] = "curr_sess_num",
// keepalive packet
[STAT_KEEPALIVE_RX_PKT] = "kepalive_rx_pkt",
[STAT_KEEPALIVE_RX_B] = "kepalive_rx_B",
[STAT_MAX] = NULL};
static void global_metrics_parse_config(const char *profile, struct global_metrics_config *config)
@@ -162,24 +206,6 @@ void global_metrics_destory(struct global_metrics *metrics)
void global_metrics_dump(struct global_metrics *metrics)
{
if (strlen(metrics->config.statsd_server) == 0)
{
LOG_INFO("%s: dev_endpoint_rx : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_endpoint_rx.n_pkts, metrics->dev_endpoint_rx.n_bytes);
LOG_INFO("%s: dev_endpoint_tx : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_endpoint_tx.n_pkts, metrics->dev_endpoint_tx.n_bytes);
LOG_INFO("%s: dev_endpoint_err_drop : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_endpoint_err_drop.n_pkts, metrics->dev_endpoint_err_drop.n_bytes);
LOG_INFO("%s: dev_nf_interface_rx : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_nf_interface_rx.n_pkts, metrics->dev_nf_interface_rx.n_bytes);
LOG_INFO("%s: dev_nf_interface_tx : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_nf_interface_tx.n_pkts, metrics->dev_nf_interface_tx.n_bytes);
LOG_INFO("%s: dev_nf_interface_err_bypass : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_nf_interface_err_bypass.n_pkts, metrics->dev_nf_interface_err_bypass.n_bytes);
LOG_INFO("%s: hit_block_policy : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->hit_block_policy.n_pkts, metrics->hit_block_policy.n_bytes);
LOG_INFO("%s: hit_bypass_policy : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->hit_bypass_policy.n_pkts, metrics->hit_bypass_policy.n_bytes);
LOG_INFO("%s: control_packet : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->control_packet_rx.n_pkts, metrics->control_packet_rx.n_bytes);
LOG_INFO("%s: current_session_num : %6lu", LOG_TAG_METRICS, metrics->session_nums);
}
// dev endpoint
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_rx.n_pkts), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_rx.n_bytes), 0, __ATOMIC_RELAXED));
@@ -190,10 +216,6 @@ void global_metrics_dump(struct global_metrics *metrics)
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_ERR_DROP_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_err_drop.n_pkts), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_ERR_DROP_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_err_drop.n_bytes), 0, __ATOMIC_RELAXED));
// hit policy
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BLOCK_POLICY_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_block_policy.n_pkts), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BLOCK_POLICY_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_block_policy.n_bytes), 0, __ATOMIC_RELAXED));
// dev nf interface
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_NF_INTERFACE_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_nf_interface_rx.n_pkts), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_NF_INTERFACE_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_nf_interface_rx.n_bytes), 0, __ATOMIC_RELAXED));
@@ -204,14 +226,42 @@ void global_metrics_dump(struct global_metrics *metrics)
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_NF_INTERFACE_ERR_BYPASS_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_nf_interface_err_bypass.n_pkts), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_NF_INTERFACE_ERR_BYPASS_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_nf_interface_err_bypass.n_bytes), 0, __ATOMIC_RELAXED));
// hit block policy
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BLOCK_POLICY_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_block_policy.n_pkts), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BLOCK_POLICY_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_block_policy.n_bytes), 0, __ATOMIC_RELAXED));
// hit bypass policy
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BYPASS_POLICY_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_bypass_policy.n_pkts), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BYPASS_POLICY_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_bypass_policy.n_bytes), 0, __ATOMIC_RELAXED));
// steering
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_STEERING_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->steering_tx.n_pkts), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_STEERING_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->steering_tx.n_bytes), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_STEERING_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->steering_rx.n_pkts), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_STEERING_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->steering_rx.n_bytes), 0, __ATOMIC_RELAXED));
// mirroring
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_MIRRORING_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->mirroring_tx.n_pkts), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_MIRRORING_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->mirroring_tx.n_bytes), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_MIRRORING_RX_DROP_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->mirroring_rx_drop.n_pkts), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_MIRRORING_RX_DROP_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->mirroring_rx_drop.n_bytes), 0, __ATOMIC_RELAXED));
// keepalive packet
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEEPALIVE_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->keepalive_packet_rx.n_pkts), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEEPALIVE_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->keepalive_packet_rx.n_bytes), 0, __ATOMIC_RELAXED));
// control packet
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_rx.n_pkts), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_rx.n_bytes), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_PKT_OPENING], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_opening_num), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_PKT_ACTIVE], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_active_num), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_PKT_CLOSING], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_closing_num), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_PKT_RESETALL], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_resetall_num), 0, __ATOMIC_RELAXED));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_PKT_ERROR], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_error_num), 0, __ATOMIC_RELAXED));
// current session number
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CURRENT_SESSION_NUMS], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->session_nums), 0, __ATOMIC_RELAXED));

View File

@@ -556,7 +556,7 @@ static int packet_io_get_metadata(marsio_buff_t *rx_buff, struct metadata *meta)
}
meta->sids.num = marsio_buff_get_sid_list(rx_buff, meta->sids.elems, sizeof(meta->sids.elems) / sizeof(meta->sids.elems[0]));
if (meta->sids.num <= 0)
if (meta->sids.num < 0)
{
LOG_ERROR("%s: unable to get sid_list from metadata", LOG_TAG_PKTIO);
return -1;
@@ -618,7 +618,7 @@ static int packet_io_set_metadata(marsio_buff_t *tx_buff, struct metadata *meta)
}
}
if (meta->sids.num)
if (meta->sids.num > 0)
{
if (marsio_buff_set_sid_list(tx_buff, meta->sids.elems, meta->sids.num) != 0)
{
@@ -639,11 +639,15 @@ static void packet_io_dump_metadata(marsio_buff_t *tx_buff, struct metadata *met
// return -1 : error
static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx)
{
struct thread_ctx *thread = (struct thread_ctx *)ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
struct metadata meta;
if (packet_io_get_metadata(rx_buff, &meta) == -1)
{
LOG_ERROR("%s: unexpected control packet, unable to get metadata", LOG_TAG_PKTIO);
packet_io_dump_metadata(rx_buff, &meta);
__atomic_fetch_add(&g_metrics->control_packet_error_num, 1, __ATOMIC_RELAXED);
return -1;
}
@@ -652,27 +656,37 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf
if (ctrl_packet_parser_parse(&ctrl_parser, meta.raw_data + meta.l7_offset, meta.raw_len - meta.l7_offset) == -1)
{
LOG_ERROR("%s: unexpected control packet, unable to parse data", LOG_TAG_PKTIO);
__atomic_fetch_add(&g_metrics->control_packet_error_num, 1, __ATOMIC_RELAXED);
return -1;
}
if (ctrl_parser.session_id != meta.session_id)
{
LOG_ERROR("%s: unexpected control packet, metadata's session %lu != control packet's session %lu", LOG_TAG_PKTIO, meta.session_id, ctrl_parser.session_id);
__atomic_fetch_add(&g_metrics->control_packet_error_num, 1, __ATOMIC_RELAXED);
return -1;
}
LOG_INFO("%s: recv control packet, session %lu %s", LOG_TAG_PKTIO, ctrl_parser.session_id, session_state_to_string(ctrl_parser.state));
switch (ctrl_parser.state)
{
case SESSION_STATE_OPENING:
__atomic_fetch_add(&g_metrics->control_packet_opening_num, 1, __ATOMIC_RELAXED);
// when session opening, firewall not send policy id
// return handle_session_opening(&meta, &ctrl_parser, thread_seq, ctx);
break;
case SESSION_STATE_CLONING:
case SESSION_STATE_CLOSING:
__atomic_fetch_add(&g_metrics->control_packet_closing_num, 1, __ATOMIC_RELAXED);
return handle_session_closing(&meta, &ctrl_parser, thread_seq, ctx);
case SESSION_STATE_ACTIVE:
__atomic_fetch_add(&g_metrics->control_packet_active_num, 1, __ATOMIC_RELAXED);
return handle_session_active(&meta, &ctrl_parser, thread_seq, ctx);
case SESSION_STATE_RESETALL:
__atomic_fetch_add(&g_metrics->control_packet_resetall_num, 1, __ATOMIC_RELAXED);
return handle_session_resetall(&meta, &ctrl_parser, thread_seq, ctx);
default:
__atomic_fetch_add(&g_metrics->control_packet_error_num, 1, __ATOMIC_RELAXED);
}
return 0;
@@ -1201,7 +1215,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
s_ctx->first_ctrl_pkt.header_len = meta->l7_offset;
s_ctx->chaining = selected_chaining_create(128);
LOG_INFO("%s: session %lu %s opening", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
LOG_INFO("%s: session %lu %s active first", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
for (int i = 0; i < parser->policy_id_num; i++)
{
@@ -1266,7 +1280,7 @@ static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser *
}
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
LOG_INFO("%s: session %lu %s update", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
LOG_INFO("%s: session %lu %s active again", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
for (int i = 0; i < parser->policy_id_num; i++)
{
@@ -1299,7 +1313,7 @@ static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser
struct global_metrics *g_metrics = thread->ref_metrics;
struct sce_ctx *sce_ctx = thread->ref_sce_ctx;
LOG_ERROR("%s: session %lu notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id);
LOG_ERROR("%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id);
__atomic_fetch_and(&g_metrics->session_nums, 0, __ATOMIC_RELAXED);