perf: Optimize the output of global metrics to reduce atomic operations

This commit is contained in:
luwenpeng
2023-10-12 16:31:53 +08:00
parent 4f870de963
commit 8d1a9b3be5
10 changed files with 265 additions and 160 deletions

View File

@@ -149,8 +149,8 @@ void route_ctx_copy(struct route_ctx *dst, struct route_ctx *src)
void throughput_metrics_inc(struct throughput_metrics *iterm, uint64_t n_pkts, uint64_t n_bytes)
{
ATOMIC_ADD(&iterm->n_bytes, n_bytes);
ATOMIC_ADD(&iterm->n_pkts, n_pkts);
iterm->n_bytes += n_bytes;
iterm->n_pkts += n_pkts;
}
/******************************************************************************

View File

@@ -82,6 +82,17 @@ struct sf_session_metrics
uint64_t log; // 累计值
};
struct thread_metrics
{
struct device_metrics device;
struct raw_pkt_metrics raw_pkt;
struct ctrl_pkt_metrics ctrl_pkt;
struct keepalived_pkt_metrics kee_pkt;
struct sf_status_metrics sf_status;
struct sf_session_metrics sf_session;
};
struct global_metrics
{
struct device_metrics device;
@@ -95,11 +106,16 @@ struct global_metrics
struct metrics_config config;
screen_stat_handle_t fs_handle;
int fs_id[128];
int thread_num;
int *thread_metrics_flag;
struct thread_metrics *thread_metrics_cache;
};
struct global_metrics *global_metrics_create(const char *profile);
void global_metrics_destory(struct global_metrics *metrics);
void global_metrics_dump(struct global_metrics *metrics);
struct global_metrics *global_metrics_create(const char *profile, int thread_num);
void global_metrics_destory(struct global_metrics *global_metrics);
void global_metrics_update(struct global_metrics *global_metrics, struct thread_metrics *thread_metrics, int thread_id);
void global_metrics_dump(struct global_metrics *global_metrics);
#ifdef __cpluscplus
}

View File

@@ -12,6 +12,7 @@ extern "C"
#include "timestamp.h"
#include "packet_io.h"
#include "session_table.h"
#include "global_metrics.h"
#define MAX_THREAD_NUM 128
@@ -29,7 +30,8 @@ struct thread_ctx
struct packet_io *ref_io;
struct sce_ctx *ref_sce_ctx;
struct global_metrics *ref_metrics;
struct thread_metrics thread_metrics;
struct global_metrics *ref_global_metrics;
struct policy_enforcer *ref_enforcer;
int session_table_need_reset;

View File

@@ -209,148 +209,232 @@ static void global_metrics_parse_config(const char *profile, struct metrics_conf
LOG_DEBUG("%s: STAT->prometheus_listen_url : %s", LOG_TAG_METRICS, config->prometheus_listen_url);
}
struct global_metrics *global_metrics_create(const char *profile)
struct global_metrics *global_metrics_create(const char *profile, int thread_num)
{
struct global_metrics *metrics = (struct global_metrics *)calloc(1, sizeof(struct global_metrics));
assert(metrics != NULL);
struct global_metrics *global_metrics = (struct global_metrics *)calloc(1, sizeof(struct global_metrics));
assert(global_metrics != NULL);
global_metrics_parse_config(profile, &metrics->config);
global_metrics->thread_num = thread_num;
global_metrics->thread_metrics_flag = (int *)calloc(global_metrics->thread_num, sizeof(int));
global_metrics->thread_metrics_cache = (struct thread_metrics *)calloc(global_metrics->thread_num, sizeof(struct thread_metrics));
FS_library_set_prometheus_port(metrics->config.prometheus_listen_port);
FS_library_set_prometheus_url_path(metrics->config.prometheus_listen_url);
global_metrics_parse_config(profile, &global_metrics->config);
FS_library_set_prometheus_port(global_metrics->config.prometheus_listen_port);
FS_library_set_prometheus_url_path(global_metrics->config.prometheus_listen_url);
FS_library_init();
int value = 0;
metrics->fs_handle = FS_create_handle(); // TODO memleak no free() API
FS_set_para(metrics->fs_handle, APP_NAME, "SCE", 3);
FS_set_para(metrics->fs_handle, OUTPUT_DEVICE, metrics->config.output_file, strlen(metrics->config.output_file));
global_metrics->fs_handle = FS_create_handle(); // TODO memleak no free() API
FS_set_para(global_metrics->fs_handle, APP_NAME, "SCE", 3);
FS_set_para(global_metrics->fs_handle, OUTPUT_DEVICE, global_metrics->config.output_file, strlen(global_metrics->config.output_file));
value = 1;
FS_set_para(metrics->fs_handle, OUTPUT_PROMETHEUS, &value, sizeof(value));
FS_set_para(global_metrics->fs_handle, OUTPUT_PROMETHEUS, &value, sizeof(value));
value = 1;
FS_set_para(metrics->fs_handle, PRINT_MODE, &value, sizeof(value));
FS_set_para(global_metrics->fs_handle, PRINT_MODE, &value, sizeof(value));
value = 0;
FS_set_para(metrics->fs_handle, CREATE_THREAD, &value, sizeof(value));
FS_set_para(global_metrics->fs_handle, CREATE_THREAD, &value, sizeof(value));
if (strlen(metrics->config.statsd_server) > 0 && metrics->config.statsd_port != 0)
if (strlen(global_metrics->config.statsd_server) > 0 && global_metrics->config.statsd_port != 0)
{
FS_set_para(metrics->fs_handle, STATS_SERVER_IP, metrics->config.statsd_server, strlen(metrics->config.statsd_server));
FS_set_para(metrics->fs_handle, STATS_SERVER_PORT, &(metrics->config.statsd_port), sizeof(metrics->config.statsd_port));
FS_set_para(metrics->fs_handle, STATS_FORMAT, &metrics->config.statsd_format, sizeof(metrics->config.statsd_format));
FS_set_para(global_metrics->fs_handle, STATS_SERVER_IP, global_metrics->config.statsd_server, strlen(global_metrics->config.statsd_server));
FS_set_para(global_metrics->fs_handle, STATS_SERVER_PORT, &(global_metrics->config.statsd_port), sizeof(global_metrics->config.statsd_port));
FS_set_para(global_metrics->fs_handle, STATS_FORMAT, &global_metrics->config.statsd_format, sizeof(global_metrics->config.statsd_format));
}
if (STAT_MAX >= (sizeof(metrics->fs_id) / sizeof(metrics->fs_id[0])))
if (STAT_MAX >= (sizeof(global_metrics->fs_id) / sizeof(global_metrics->fs_id[0])))
{
LOG_ERROR("%s: field stat has insufficient space to store fs_id, and supports a maximum of %lu fsids, but %d is needed ", LOG_TAG_METRICS, (sizeof(metrics->fs_id) / sizeof(metrics->fs_id[0])), STAT_MAX);
global_metrics_destory(metrics);
LOG_ERROR("%s: field stat has insufficient space to store fs_id, and supports a maximum of %lu fsids, but %d is needed ", LOG_TAG_METRICS, (sizeof(global_metrics->fs_id) / sizeof(global_metrics->fs_id[0])), STAT_MAX);
global_metrics_destory(global_metrics);
return NULL;
}
for (int i = 0; i < STAT_MAX; i++)
{
metrics->fs_id[i] = FS_register(metrics->fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, stat_map[i]);
global_metrics->fs_id[i] = FS_register(global_metrics->fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, stat_map[i]);
}
FS_start(metrics->fs_handle);
FS_start(global_metrics->fs_handle);
return metrics;
return global_metrics;
}
void global_metrics_destory(struct global_metrics *metrics)
void global_metrics_destory(struct global_metrics *global_metrics)
{
if (metrics)
if (global_metrics)
{
FS_library_destroy();
free(metrics);
metrics = NULL;
free(global_metrics);
global_metrics = NULL;
}
}
void global_metrics_dump(struct global_metrics *metrics)
#define THREAD_METRICS_CACHE_IS_FREE 0
#define THREAD_METRICS_CACHE_IS_BUSY 0xf
void global_metrics_update(struct global_metrics *global_metrics, struct thread_metrics *thread_metrics, int thread_id)
{
if (ATOMIC_READ(&(global_metrics->thread_metrics_flag[thread_id])) == THREAD_METRICS_CACHE_IS_FREE)
{
struct thread_metrics *ptr_metrics = &global_metrics->thread_metrics_cache[thread_id];
memcpy(ptr_metrics, thread_metrics, sizeof(struct thread_metrics));
memset(thread_metrics, 0, sizeof(struct thread_metrics));
ATOMIC_SET(&(global_metrics->thread_metrics_flag[thread_id]), THREAD_METRICS_CACHE_IS_BUSY);
}
}
void global_metrics_dump(struct global_metrics *global_metrics)
{
for (int i = 0; i < global_metrics->thread_num; i++)
{
if (ATOMIC_READ(&(global_metrics->thread_metrics_flag[i])) == THREAD_METRICS_CACHE_IS_BUSY)
{
struct thread_metrics *thread_metrics = &global_metrics->thread_metrics_cache[i];
global_metrics->device.nf_rx.n_pkts += thread_metrics->device.nf_rx.n_pkts;
global_metrics->device.nf_rx.n_bytes += thread_metrics->device.nf_rx.n_bytes;
global_metrics->device.nf_tx.n_pkts += thread_metrics->device.nf_tx.n_pkts;
global_metrics->device.nf_tx.n_bytes += thread_metrics->device.nf_tx.n_bytes;
global_metrics->device.endpoint_rx.n_pkts += thread_metrics->device.endpoint_rx.n_pkts;
global_metrics->device.endpoint_rx.n_bytes += thread_metrics->device.endpoint_rx.n_bytes;
global_metrics->device.endpoint_tx.n_pkts += thread_metrics->device.endpoint_tx.n_pkts;
global_metrics->device.endpoint_tx.n_bytes += thread_metrics->device.endpoint_tx.n_bytes;
global_metrics->device.endpoint_drop.n_pkts += thread_metrics->device.endpoint_drop.n_pkts;
global_metrics->device.endpoint_drop.n_bytes += thread_metrics->device.endpoint_drop.n_bytes;
global_metrics->raw_pkt.mirr_bypass.n_pkts += thread_metrics->raw_pkt.mirr_bypass.n_pkts;
global_metrics->raw_pkt.mirr_bypass.n_bytes += thread_metrics->raw_pkt.mirr_bypass.n_bytes;
global_metrics->raw_pkt.mirr_block.n_pkts += thread_metrics->raw_pkt.mirr_block.n_pkts;
global_metrics->raw_pkt.mirr_block.n_bytes += thread_metrics->raw_pkt.mirr_block.n_bytes;
global_metrics->raw_pkt.mirr_rx_drop.n_pkts += thread_metrics->raw_pkt.mirr_rx_drop.n_pkts;
global_metrics->raw_pkt.mirr_rx_drop.n_bytes += thread_metrics->raw_pkt.mirr_rx_drop.n_bytes;
global_metrics->raw_pkt.mirr_tx.n_pkts += thread_metrics->raw_pkt.mirr_tx.n_pkts;
global_metrics->raw_pkt.mirr_tx.n_bytes += thread_metrics->raw_pkt.mirr_tx.n_bytes;
global_metrics->raw_pkt.stee_bypass.n_pkts += thread_metrics->raw_pkt.stee_bypass.n_pkts;
global_metrics->raw_pkt.stee_bypass.n_bytes += thread_metrics->raw_pkt.stee_bypass.n_bytes;
global_metrics->raw_pkt.stee_block.n_pkts += thread_metrics->raw_pkt.stee_block.n_pkts;
global_metrics->raw_pkt.stee_block.n_bytes += thread_metrics->raw_pkt.stee_block.n_bytes;
global_metrics->raw_pkt.stee_rx.n_pkts += thread_metrics->raw_pkt.stee_rx.n_pkts;
global_metrics->raw_pkt.stee_rx.n_bytes += thread_metrics->raw_pkt.stee_rx.n_bytes;
global_metrics->raw_pkt.stee_tx.n_pkts += thread_metrics->raw_pkt.stee_tx.n_pkts;
global_metrics->raw_pkt.stee_tx.n_bytes += thread_metrics->raw_pkt.stee_tx.n_bytes;
global_metrics->raw_pkt.miss_sess.n_pkts += thread_metrics->raw_pkt.miss_sess.n_pkts;
global_metrics->raw_pkt.miss_sess.n_bytes += thread_metrics->raw_pkt.miss_sess.n_bytes;
global_metrics->raw_pkt.error_bypass.n_pkts += thread_metrics->raw_pkt.error_bypass.n_pkts;
global_metrics->raw_pkt.error_bypass.n_bytes += thread_metrics->raw_pkt.error_bypass.n_bytes;
global_metrics->raw_pkt.error_block.n_pkts += thread_metrics->raw_pkt.error_block.n_pkts;
global_metrics->raw_pkt.error_block.n_bytes += thread_metrics->raw_pkt.error_block.n_bytes;
global_metrics->ctrl_pkt.rx.n_pkts += thread_metrics->ctrl_pkt.rx.n_pkts;
global_metrics->ctrl_pkt.rx.n_bytes += thread_metrics->ctrl_pkt.rx.n_bytes;
global_metrics->ctrl_pkt.tx.n_pkts += thread_metrics->ctrl_pkt.tx.n_pkts;
global_metrics->ctrl_pkt.tx.n_bytes += thread_metrics->ctrl_pkt.tx.n_bytes;
global_metrics->ctrl_pkt.opening += thread_metrics->ctrl_pkt.opening;
global_metrics->ctrl_pkt.active += thread_metrics->ctrl_pkt.active;
global_metrics->ctrl_pkt.closing += thread_metrics->ctrl_pkt.closing;
global_metrics->ctrl_pkt.resetall += thread_metrics->ctrl_pkt.resetall;
global_metrics->ctrl_pkt.error += thread_metrics->ctrl_pkt.error;
global_metrics->kee_pkt.downlink_rx.n_pkts += thread_metrics->kee_pkt.downlink_rx.n_pkts;
global_metrics->kee_pkt.downlink_rx.n_bytes += thread_metrics->kee_pkt.downlink_rx.n_bytes;
global_metrics->kee_pkt.downlink_tx.n_pkts += thread_metrics->kee_pkt.downlink_tx.n_pkts;
global_metrics->kee_pkt.downlink_tx.n_bytes += thread_metrics->kee_pkt.downlink_tx.n_bytes;
global_metrics->kee_pkt.uplink_rx.n_pkts += thread_metrics->kee_pkt.uplink_rx.n_pkts;
global_metrics->kee_pkt.uplink_rx.n_bytes += thread_metrics->kee_pkt.uplink_rx.n_bytes;
global_metrics->kee_pkt.uplink_tx_drop.n_pkts += thread_metrics->kee_pkt.uplink_tx_drop.n_pkts;
global_metrics->kee_pkt.uplink_tx_drop.n_bytes += thread_metrics->kee_pkt.uplink_tx_drop.n_bytes;
global_metrics->sf_status.active += thread_metrics->sf_status.active;
global_metrics->sf_status.inactive += thread_metrics->sf_status.inactive;
global_metrics->sf_session.num += thread_metrics->sf_session.num;
global_metrics->sf_session.log += thread_metrics->sf_session.log;
memset(thread_metrics, 0, sizeof(struct thread_metrics));
ATOMIC_SET(&(global_metrics->thread_metrics_flag[i]), THREAD_METRICS_CACHE_IS_FREE);
}
}
// device_metrics
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DEVICE_NF_RX_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->device.nf_rx.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DEVICE_NF_RX_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->device.nf_rx.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_DEVICE_NF_RX_PKT], 0, FS_OP_SET, global_metrics->device.nf_rx.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_DEVICE_NF_RX_B], 0, FS_OP_SET, global_metrics->device.nf_rx.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DEVICE_NF_TX_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->device.nf_tx.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DEVICE_NF_TX_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->device.nf_tx.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_DEVICE_NF_TX_PKT], 0, FS_OP_SET, global_metrics->device.nf_tx.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_DEVICE_NF_TX_B], 0, FS_OP_SET, global_metrics->device.nf_tx.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DEVICE_ENDPOINT_RX_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->device.endpoint_rx.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DEVICE_ENDPOINT_RX_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->device.endpoint_rx.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_DEVICE_ENDPOINT_RX_PKT], 0, FS_OP_SET, global_metrics->device.endpoint_rx.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_DEVICE_ENDPOINT_RX_B], 0, FS_OP_SET, global_metrics->device.endpoint_rx.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DEVICE_ENDPOINT_TX_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->device.endpoint_tx.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DEVICE_ENDPOINT_TX_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->device.endpoint_tx.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_DEVICE_ENDPOINT_TX_PKT], 0, FS_OP_SET, global_metrics->device.endpoint_tx.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_DEVICE_ENDPOINT_TX_B], 0, FS_OP_SET, global_metrics->device.endpoint_tx.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DEVICE_ENDPOINT_DROP_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->device.endpoint_drop.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DEVICE_ENDPOINT_DROP_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->device.endpoint_drop.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_DEVICE_ENDPOINT_DROP_PKT], 0, FS_OP_SET, global_metrics->device.endpoint_drop.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_DEVICE_ENDPOINT_DROP_B], 0, FS_OP_SET, global_metrics->device.endpoint_drop.n_bytes);
// raw_pkt_metrics
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_MIRR_BYPASS_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.mirr_bypass.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_MIRR_BYPASS_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.mirr_bypass.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_MIRR_BYPASS_PKT], 0, FS_OP_SET, global_metrics->raw_pkt.mirr_bypass.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_MIRR_BYPASS_B], 0, FS_OP_SET, global_metrics->raw_pkt.mirr_bypass.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_MIRR_BLOCK_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.mirr_block.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_MIRR_BLOCK_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.mirr_block.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_MIRR_BLOCK_PKT], 0, FS_OP_SET, global_metrics->raw_pkt.mirr_block.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_MIRR_BLOCK_B], 0, FS_OP_SET, global_metrics->raw_pkt.mirr_block.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_MIRR_RX_DROP_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.mirr_rx_drop.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_MIRR_RX_DROP_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.mirr_rx_drop.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_MIRR_RX_DROP_PKT], 0, FS_OP_SET, global_metrics->raw_pkt.mirr_rx_drop.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_MIRR_RX_DROP_B], 0, FS_OP_SET, global_metrics->raw_pkt.mirr_rx_drop.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_MIRR_TX_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.mirr_tx.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_MIRR_TX_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.mirr_tx.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_MIRR_TX_PKT], 0, FS_OP_SET, global_metrics->raw_pkt.mirr_tx.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_MIRR_TX_B], 0, FS_OP_SET, global_metrics->raw_pkt.mirr_tx.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_STEE_BYPASS_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.stee_bypass.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_STEE_BYPASS_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.stee_bypass.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_STEE_BYPASS_PKT], 0, FS_OP_SET, global_metrics->raw_pkt.stee_bypass.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_STEE_BYPASS_B], 0, FS_OP_SET, global_metrics->raw_pkt.stee_bypass.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_STEE_BLOCK_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.stee_block.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_STEE_BLOCK_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.stee_block.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_STEE_BLOCK_PKT], 0, FS_OP_SET, global_metrics->raw_pkt.stee_block.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_STEE_BLOCK_B], 0, FS_OP_SET, global_metrics->raw_pkt.stee_block.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_STEE_RX_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.stee_rx.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_STEE_RX_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.stee_rx.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_STEE_RX_PKT], 0, FS_OP_SET, global_metrics->raw_pkt.stee_rx.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_STEE_RX_B], 0, FS_OP_SET, global_metrics->raw_pkt.stee_rx.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_STEE_TX_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.stee_tx.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_STEE_TX_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.stee_tx.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_STEE_TX_PKT], 0, FS_OP_SET, global_metrics->raw_pkt.stee_tx.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_STEE_TX_B], 0, FS_OP_SET, global_metrics->raw_pkt.stee_tx.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_MISS_SESS_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.miss_sess.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_MISS_SESS_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.miss_sess.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_MISS_SESS_PKT], 0, FS_OP_SET, global_metrics->raw_pkt.miss_sess.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_MISS_SESS_B], 0, FS_OP_SET, global_metrics->raw_pkt.miss_sess.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_ERROR_BYPASS_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.error_bypass.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_ERROR_BYPASS_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.error_bypass.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_ERROR_BYPASS_PKT], 0, FS_OP_SET, global_metrics->raw_pkt.error_bypass.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_ERROR_BYPASS_B], 0, FS_OP_SET, global_metrics->raw_pkt.error_bypass.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_ERROR_BLOCK_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.error_block.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_ERROR_BLOCK_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->raw_pkt.error_block.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_ERROR_BLOCK_PKT], 0, FS_OP_SET, global_metrics->raw_pkt.error_block.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_RAW_PKT_ERROR_BLOCK_B], 0, FS_OP_SET, global_metrics->raw_pkt.error_block.n_bytes);
// ctrl_pkt_metrics
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_RX_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->ctrl_pkt.rx.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_RX_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->ctrl_pkt.rx.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_CTRL_PKT_RX_PKT], 0, FS_OP_SET, global_metrics->ctrl_pkt.rx.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_CTRL_PKT_RX_B], 0, FS_OP_SET, global_metrics->ctrl_pkt.rx.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_TX_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->ctrl_pkt.tx.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_TX_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->ctrl_pkt.tx.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_CTRL_PKT_TX_PKT], 0, FS_OP_SET, global_metrics->ctrl_pkt.tx.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_CTRL_PKT_TX_B], 0, FS_OP_SET, global_metrics->ctrl_pkt.tx.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_OPENING], 0, FS_OP_SET, ATOMIC_READ(&(metrics->ctrl_pkt.opening)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_ACTIVE], 0, FS_OP_SET, ATOMIC_READ(&(metrics->ctrl_pkt.active)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_CLOSING], 0, FS_OP_SET, ATOMIC_READ(&(metrics->ctrl_pkt.closing)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_RESETALL], 0, FS_OP_SET, ATOMIC_READ(&(metrics->ctrl_pkt.resetall)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_ERROR], 0, FS_OP_SET, ATOMIC_READ(&(metrics->ctrl_pkt.error)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_CTRL_PKT_OPENING], 0, FS_OP_SET, global_metrics->ctrl_pkt.opening);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_CTRL_PKT_ACTIVE], 0, FS_OP_SET, global_metrics->ctrl_pkt.active);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_CTRL_PKT_CLOSING], 0, FS_OP_SET, global_metrics->ctrl_pkt.closing);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_CTRL_PKT_RESETALL], 0, FS_OP_SET, global_metrics->ctrl_pkt.resetall);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_CTRL_PKT_ERROR], 0, FS_OP_SET, global_metrics->ctrl_pkt.error);
// keepalived_pkt_metrics
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEE_PKT_DOWN_RX_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->kee_pkt.downlink_rx.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEE_PKT_DOWN_RX_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->kee_pkt.downlink_rx.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_KEE_PKT_DOWN_RX_PKT], 0, FS_OP_SET, global_metrics->kee_pkt.downlink_rx.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_KEE_PKT_DOWN_RX_B], 0, FS_OP_SET, global_metrics->kee_pkt.downlink_rx.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEE_PKT_DOWN_TX_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->kee_pkt.downlink_tx.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEE_PKT_DOWN_TX_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->kee_pkt.downlink_tx.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_KEE_PKT_DOWN_TX_PKT], 0, FS_OP_SET, global_metrics->kee_pkt.downlink_tx.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_KEE_PKT_DOWN_TX_B], 0, FS_OP_SET, global_metrics->kee_pkt.downlink_tx.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEE_PKT_UP_RX_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->kee_pkt.uplink_rx.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEE_PKT_UP_RX_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->kee_pkt.uplink_rx.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_KEE_PKT_UP_RX_PKT], 0, FS_OP_SET, global_metrics->kee_pkt.uplink_rx.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_KEE_PKT_UP_RX_B], 0, FS_OP_SET, global_metrics->kee_pkt.uplink_rx.n_bytes);
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEE_PKT_UP_TX_DROP_PKT], 0, FS_OP_SET, ATOMIC_READ(&(metrics->kee_pkt.uplink_tx_drop.n_pkts)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEE_PKT_UP_TX_DROP_B], 0, FS_OP_SET, ATOMIC_READ(&(metrics->kee_pkt.uplink_tx_drop.n_bytes)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_KEE_PKT_UP_TX_DROP_PKT], 0, FS_OP_SET, global_metrics->kee_pkt.uplink_tx_drop.n_pkts);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_KEE_PKT_UP_TX_DROP_B], 0, FS_OP_SET, global_metrics->kee_pkt.uplink_tx_drop.n_bytes);
// sf_status_metrics
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_SF_STATUS_ACTIVE], 0, FS_OP_SET, ATOMIC_READ(&(metrics->sf_status.active)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_SF_STATUS_INACTIVE], 0, FS_OP_SET, ATOMIC_READ(&(metrics->sf_status.inactive)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_SF_STATUS_ACTIVE], 0, FS_OP_SET, global_metrics->sf_status.active);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_SF_STATUS_INACTIVE], 0, FS_OP_SET, global_metrics->sf_status.inactive);
// sf_session_metrics
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_SF_SESSION_NUM], 0, FS_OP_SET, ATOMIC_READ(&(metrics->sf_session.num)));
FS_operate(metrics->fs_handle, metrics->fs_id[STAT_SF_SESSION_LOG], 0, FS_OP_SET, ATOMIC_READ(&(metrics->sf_session.log)));
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_SF_SESSION_NUM], 0, FS_OP_SET, global_metrics->sf_session.num);
FS_operate(global_metrics->fs_handle, global_metrics->fs_id[STAT_SF_SESSION_LOG], 0, FS_OP_SET, global_metrics->sf_session.log);
FS_passive_output(metrics->fs_handle);
FS_passive_output(global_metrics->fs_handle);
}

View File

@@ -107,6 +107,7 @@ static void *worker_thread_cycle(void *arg)
if (timestamp_get_msec(ts) - sf_metrics_last_send_ts >= sf_metrics_send_interval)
{
global_metrics_update(thread_ctx->ref_global_metrics, &thread_ctx->thread_metrics, thread_ctx->thread_index);
sf_metrics_send(thread_ctx->sf_metrics);
sf_metrics_reset(thread_ctx->sf_metrics);
sf_metrics_last_send_ts = timestamp_get_msec(ts);
@@ -190,7 +191,7 @@ int main(int argc, char **argv)
ctx->work_threads[i].session_table = session_table_create();
ctx->work_threads[i].sf_metrics = sf_metrics_create(profile);
ctx->work_threads[i].ref_io = ctx->io;
ctx->work_threads[i].ref_metrics = ctx->metrics;
ctx->work_threads[i].ref_global_metrics = ctx->metrics;
ctx->work_threads[i].ref_enforcer = ctx->enforcer;
ctx->work_threads[i].ref_sce_ctx = ctx;
ctx->work_threads[i].session_table_need_reset = 0;

View File

@@ -460,30 +460,30 @@ static int action_nf_inject(marsio_buff_t *rx_buff, struct metadata *meta, struc
static void action_err_bypass(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
int nsend = action_nf_inject(rx_buff, meta, sf, thread_ctx);
if (nsend > 0)
{
throughput_metrics_inc(&(g_metrics->raw_pkt.error_bypass), 1, nsend);
throughput_metrics_inc(&(thread_metrics->raw_pkt.error_bypass), 1, nsend);
}
}
static void action_err_block(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
struct packet_io *packet_io = thread_ctx->ref_io;
int thread_index = thread_ctx->thread_index;
int raw_len = marsio_buff_datalen(rx_buff);
throughput_metrics_inc(&(g_metrics->raw_pkt.error_block), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->raw_pkt.error_block), 1, raw_len);
marsio_buff_free(packet_io->instance, &rx_buff, 1, 0, thread_index);
}
// return nsend
static int action_nf_inject(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
struct packet_io *packet_io = thread_ctx->ref_io;
int thread_index = thread_ctx->thread_index;
@@ -496,29 +496,29 @@ static int action_nf_inject(marsio_buff_t *rx_buff, struct metadata *meta, struc
int raw_len = marsio_buff_datalen(rx_buff);
marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_index, &rx_buff, 1);
throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.nf_tx), 1, raw_len);
return raw_len;
}
static void action_mirr_bypass(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
int raw_len = marsio_buff_datalen(rx_buff);
throughput_metrics_inc(&(g_metrics->raw_pkt.mirr_bypass), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->raw_pkt.mirr_bypass), 1, raw_len);
}
static void action_mirr_block(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
int raw_len = marsio_buff_datalen(rx_buff);
throughput_metrics_inc(&(g_metrics->raw_pkt.mirr_block), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->raw_pkt.mirr_block), 1, raw_len);
}
static void action_mirr_forward(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
struct packet_io *packet_io = thread_ctx->ref_io;
int thread_index = thread_ctx->thread_index;
@@ -536,39 +536,39 @@ static void action_mirr_forward(marsio_buff_t *rx_buff, struct metadata *meta, s
memcpy(copy_ptr, raw_data, raw_len);
int nsend = send_packet_to_sf(new_buff, meta, sf, thread_ctx);
throughput_metrics_inc(&(g_metrics->device.endpoint_tx), 1, nsend);
throughput_metrics_inc(&(g_metrics->raw_pkt.mirr_tx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.endpoint_tx), 1, nsend);
throughput_metrics_inc(&(thread_metrics->raw_pkt.mirr_tx), 1, raw_len);
throughput_metrics_inc(&sf->tx, 1, nsend);
sf_metrics_inc(thread_ctx->sf_metrics, sf->rule_vsys_id, sf->rule_id, sf->sff_profile_id, sf->sf_profile_id, 0, 0, 1, nsend);
}
static void action_stee_bypass(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
int raw_len = marsio_buff_datalen(rx_buff);
throughput_metrics_inc(&(g_metrics->raw_pkt.stee_bypass), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->raw_pkt.stee_bypass), 1, raw_len);
}
static void action_stee_block(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
struct packet_io *packet_io = thread_ctx->ref_io;
int thread_index = thread_ctx->thread_index;
int raw_len = marsio_buff_datalen(rx_buff);
throughput_metrics_inc(&(g_metrics->raw_pkt.stee_block), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->raw_pkt.stee_block), 1, raw_len);
marsio_buff_free(packet_io->instance, &rx_buff, 1, 0, thread_index);
}
static void action_stee_forward(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
int raw_len = marsio_buff_datalen(rx_buff);
int nsend = send_packet_to_sf(rx_buff, meta, sf, thread_ctx);
throughput_metrics_inc(&(g_metrics->device.endpoint_tx), 1, nsend);
throughput_metrics_inc(&(g_metrics->raw_pkt.stee_tx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.endpoint_tx), 1, nsend);
throughput_metrics_inc(&(thread_metrics->raw_pkt.stee_tx), 1, raw_len);
throughput_metrics_inc(&sf->tx, 1, nsend);
sf_metrics_inc(thread_ctx->sf_metrics, sf->rule_vsys_id, sf->rule_id, sf->sff_profile_id, sf->sf_profile_id, 0, 0, 1, nsend);
}
@@ -748,7 +748,7 @@ static void send_event_log(struct session_ctx *session_ctx, struct thread_ctx *t
{
int nsend = 0;
struct sce_ctx *sce_ctx = thread_ctx->ref_sce_ctx;
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
struct selected_chaining *chaining_raw = session_ctx->chainings.chaining_raw;
struct selected_chaining *chaining_decrypted = session_ctx->chainings.chaining_decrypted;
@@ -757,9 +757,9 @@ static void send_event_log(struct session_ctx *session_ctx, struct thread_ctx *t
nsend = send_ctrl_packet(session_ctx, chaining_raw, thread_ctx);
if (nsend > 0)
{
ATOMIC_INC(&(g_metrics->sf_session.log));
throughput_metrics_inc(&(g_metrics->ctrl_pkt.tx), 1, nsend);
throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, nsend);
ATOMIC_INC(&(thread_metrics->sf_session.log));
throughput_metrics_inc(&(thread_metrics->ctrl_pkt.tx), 1, nsend);
throughput_metrics_inc(&(thread_metrics->device.nf_tx), 1, nsend);
}
}
@@ -768,9 +768,9 @@ static void send_event_log(struct session_ctx *session_ctx, struct thread_ctx *t
nsend = send_ctrl_packet(session_ctx, chaining_decrypted, thread_ctx);
if (nsend > 0)
{
ATOMIC_INC(&(g_metrics->sf_session.log));
throughput_metrics_inc(&(g_metrics->ctrl_pkt.tx), 1, nsend);
throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, nsend);
ATOMIC_INC(&(thread_metrics->sf_session.log));
throughput_metrics_inc(&(thread_metrics->ctrl_pkt.tx), 1, nsend);
throughput_metrics_inc(&(thread_metrics->device.nf_tx), 1, nsend);
}
}
}
@@ -819,7 +819,7 @@ static void handle_policy_mutil_hits(struct policy_enforcer *enforcer, struct se
static void handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser *ctrl_parser, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
struct policy_enforcer *enforcer = thread_ctx->ref_enforcer;
struct session_table *session_table = thread_ctx->session_table;
int chaining_size = policy_enforce_chaining_size(enforcer);
@@ -858,12 +858,12 @@ static void handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
send_event_log(session_ctx, thread_ctx);
session_table_insert(session_table, session_ctx->session_id, &session_ctx->inner_tuple4, session_ctx, session_value_free_cb);
ATOMIC_INC(&(g_metrics->sf_session.num));
ATOMIC_INC(&(thread_metrics->sf_session.num));
}
static void handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser *ctrl_parser, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
struct session_table *session_table = thread_ctx->session_table;
struct session_node *node = session_table_search_by_id(session_table, meta->session_id);
@@ -879,7 +879,7 @@ static void handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser
dump_sf_metrics(s_ctx, chaining_decrypted, "decrypted_traffic");
session_table_delete_by_id(session_table, meta->session_id);
ATOMIC_DEC(&(g_metrics->sf_session.num));
ATOMIC_DEC(&(thread_metrics->sf_session.num));
}
}
@@ -914,11 +914,11 @@ static void handle_session_active(struct metadata *meta, struct ctrl_pkt_parser
static void handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser *ctrl_parser, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
struct sce_ctx *sce_ctx = thread_ctx->ref_sce_ctx;
LOG_ERROR("%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id);
ATOMIC_ZERO(&(g_metrics->sf_session.num));
ATOMIC_ZERO(&(thread_metrics->sf_session.num));
for (int i = 0; i < sce_ctx->nr_worker_threads; i++)
{
struct thread_ctx *temp_ctx = &sce_ctx->work_threads[i];
@@ -932,7 +932,7 @@ static void handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parse
static void handle_control_packet(marsio_buff_t *rx_buff, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
struct metadata meta;
struct ctrl_pkt_parser ctrl_parser;
@@ -958,20 +958,20 @@ static void handle_control_packet(marsio_buff_t *rx_buff, struct thread_ctx *thr
switch (ctrl_parser.state)
{
case SESSION_STATE_OPENING:
ATOMIC_INC(&(g_metrics->ctrl_pkt.opening));
ATOMIC_INC(&(thread_metrics->ctrl_pkt.opening));
// when session opening, firewall not send policy id
// return handle_session_opening(&meta, &ctrl_parser, ctx);
break;
case SESSION_STATE_CLOSING:
ATOMIC_INC(&(g_metrics->ctrl_pkt.closing));
ATOMIC_INC(&(thread_metrics->ctrl_pkt.closing));
handle_session_closing(&meta, &ctrl_parser, thread_ctx);
break;
case SESSION_STATE_ACTIVE:
ATOMIC_INC(&(g_metrics->ctrl_pkt.active));
ATOMIC_INC(&(thread_metrics->ctrl_pkt.active));
handle_session_active(&meta, &ctrl_parser, thread_ctx);
break;
case SESSION_STATE_RESETALL:
ATOMIC_INC(&(g_metrics->ctrl_pkt.resetall));
ATOMIC_INC(&(thread_metrics->ctrl_pkt.resetall));
handle_session_resetall(&meta, &ctrl_parser, thread_ctx);
break;
default:
@@ -980,14 +980,14 @@ static void handle_control_packet(marsio_buff_t *rx_buff, struct thread_ctx *thr
return;
error_ctrl_pkt:
ATOMIC_INC(&(g_metrics->ctrl_pkt.error));
ATOMIC_INC(&(thread_metrics->ctrl_pkt.error));
return;
}
static void handle_raw_packet(marsio_buff_t *rx_buff, struct thread_ctx *thread_ctx)
{
struct session_table *session_table = thread_ctx->session_table;
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
struct metadata meta;
struct session_ctx *session_ctx = NULL;
@@ -1018,7 +1018,7 @@ static void handle_raw_packet(marsio_buff_t *rx_buff, struct thread_ctx *thread_
session_ctx = raw_packet_search_session(session_table, meta.raw_data, meta.raw_len, meta.session_id);
if (session_ctx == NULL)
{
throughput_metrics_inc(&(g_metrics->raw_pkt.miss_sess), 1, meta.raw_len);
throughput_metrics_inc(&(thread_metrics->raw_pkt.miss_sess), 1, meta.raw_len);
goto error_bypass;
}
@@ -1048,7 +1048,7 @@ error_bypass:
static void handle_inject_packet(marsio_buff_t *rx_buff, struct thread_ctx *thread_ctx)
{
struct session_table *session_table = thread_ctx->session_table;
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
struct metadata meta;
struct g_vxlan *g_vxlan_hdr = NULL;
@@ -1061,7 +1061,7 @@ static void handle_inject_packet(marsio_buff_t *rx_buff, struct thread_ctx *thre
char *raw_data = marsio_buff_mtod(rx_buff);
if (g_vxlan_decode(&g_vxlan_hdr, raw_data, raw_len) == -1)
{
throughput_metrics_inc(&(g_metrics->device.endpoint_drop), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.endpoint_drop), 1, raw_len);
action_err_block(rx_buff, &meta, NULL, thread_ctx);
return;
}
@@ -1102,14 +1102,14 @@ static void handle_inject_packet(marsio_buff_t *rx_buff, struct thread_ctx *thre
{
LOG_DEBUG("%s: unexpected inject packet, session %lu %s with sf_profile_id %d executes mirror and does not require reflow, drop !!!",
LOG_TAG_PKTIO, session_ctx->session_id, session_ctx->session_addr, chaining->chaining[sf_index].sf_profile_id);
throughput_metrics_inc(&(g_metrics->raw_pkt.mirr_rx_drop), 1, meta.raw_len);
throughput_metrics_inc(&(thread_metrics->raw_pkt.mirr_rx_drop), 1, meta.raw_len);
goto error_block;
}
else
{
struct selected_sf *sf = &(chaining->chaining[sf_index]);
throughput_metrics_inc(&sf->rx, 1, raw_len);
throughput_metrics_inc(&(g_metrics->raw_pkt.stee_rx), 1, meta.raw_len);
throughput_metrics_inc(&(thread_metrics->raw_pkt.stee_rx), 1, meta.raw_len);
sf_metrics_inc(thread_ctx->sf_metrics, sf->rule_vsys_id, sf->rule_id, sf->sff_profile_id, sf->sf_profile_id, 1, raw_len, 0, 0);
}
@@ -1118,7 +1118,7 @@ static void handle_inject_packet(marsio_buff_t *rx_buff, struct thread_ctx *thre
return;
error_block:
throughput_metrics_inc(&(g_metrics->device.endpoint_drop), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.endpoint_drop), 1, raw_len);
marsio_buff_adj(rx_buff, raw_len - meta.raw_len);
action_err_block(rx_buff, &meta, NULL, thread_ctx);
}
@@ -1323,7 +1323,7 @@ void packet_io_thread_wait(struct packet_io *handle, struct thread_ctx *thread_c
int packet_io_thread_polling_nf(struct packet_io *handle, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
int thread_index = thread_ctx->thread_index;
marsio_buff_t *rx_buffs[RX_BURST_MAX];
@@ -1339,8 +1339,8 @@ int packet_io_thread_polling_nf(struct packet_io *handle, struct thread_ctx *thr
{
int raw_len = marsio_buff_datalen(rx_buffs[j]);
throughput_metrics_inc(&(g_metrics->device.nf_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.nf_rx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.nf_tx), 1, raw_len);
}
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_index, rx_buffs, nr_recv);
@@ -1354,28 +1354,28 @@ int packet_io_thread_polling_nf(struct packet_io *handle, struct thread_ctx *thr
if (is_downlink_keepalive_packet(rx_buff))
{
throughput_metrics_inc(&(g_metrics->device.nf_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.nf_rx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.nf_tx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->kee_pkt.downlink_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->kee_pkt.downlink_tx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->kee_pkt.downlink_rx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->kee_pkt.downlink_tx), 1, raw_len);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_index, &rx_buff, 1);
}
else if (marsio_buff_is_ctrlbuf(rx_buff))
{
throughput_metrics_inc(&(g_metrics->device.nf_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->device.nf_tx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.nf_rx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.nf_tx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->ctrl_pkt.rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->ctrl_pkt.tx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->ctrl_pkt.rx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->ctrl_pkt.tx), 1, raw_len);
handle_control_packet(rx_buff, thread_ctx);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_index, &rx_buff, 1);
}
else
{
throughput_metrics_inc(&(g_metrics->device.nf_rx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.nf_rx), 1, raw_len);
handle_raw_packet(rx_buff, thread_ctx);
}
@@ -1386,7 +1386,7 @@ int packet_io_thread_polling_nf(struct packet_io *handle, struct thread_ctx *thr
int packet_io_thread_polling_endpoint(struct packet_io *handle, struct thread_ctx *thread_ctx)
{
struct global_metrics *g_metrics = thread_ctx->ref_metrics;
struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics;
int thread_index = thread_ctx->thread_index;
marsio_buff_t *rx_buffs[RX_BURST_MAX];
@@ -1402,8 +1402,8 @@ int packet_io_thread_polling_endpoint(struct packet_io *handle, struct thread_ct
{
int raw_len = marsio_buff_datalen(rx_buffs[j]);
throughput_metrics_inc(&(g_metrics->device.endpoint_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->device.endpoint_tx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.endpoint_rx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.endpoint_tx), 1, raw_len);
}
marsio_send_burst(handle->dev_endpoint.mr_path, thread_index, rx_buffs, nr_recv);
@@ -1417,15 +1417,15 @@ int packet_io_thread_polling_endpoint(struct packet_io *handle, struct thread_ct
if (is_uplink_keepalive_packet(rx_buff))
{
throughput_metrics_inc(&(g_metrics->device.endpoint_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->kee_pkt.uplink_rx), 1, raw_len);
throughput_metrics_inc(&(g_metrics->kee_pkt.uplink_tx_drop), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.endpoint_rx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->kee_pkt.uplink_rx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->kee_pkt.uplink_tx_drop), 1, raw_len);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_index);
}
else
{
throughput_metrics_inc(&(g_metrics->device.endpoint_rx), 1, raw_len);
throughput_metrics_inc(&(thread_metrics->device.endpoint_rx), 1, raw_len);
handle_inject_packet(rx_buff, thread_ctx);
}

View File

@@ -1054,7 +1054,7 @@ static void select_sf_by_nearby_and_adminstatus(struct policy_enforcer *enforcer
static enum session_action select_sf_by_ldbc(struct policy_enforcer *enforcer, struct session_ctx *s_ctx, struct sff_param *sff_param, struct selected_sf *sf, struct fixed_num_array *array, uint64_t hash)
{
struct thread_ctx *thread = (struct thread_ctx *)s_ctx->ref_thread_ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
struct thread_metrics *thread_metrics = &thread->thread_metrics;
struct sf_param *sf_param = NULL;
char buffer[16];
@@ -1086,7 +1086,7 @@ static enum session_action select_sf_by_ldbc(struct policy_enforcer *enforcer, s
memset(sf->sf_dst_mac, 0, sizeof(sf->sf_dst_mac));
if (health_check_session_get_mac(health_check_session_id, sf->sf_dst_mac) == 0)
{
ATOMIC_INC(&(g_metrics->sf_status.active));
ATOMIC_INC(&(thread_metrics->sf_status.active));
sf->sf_profile_id = sf_profile_id;
sf->sf_action_reason = ACTION_FORWAED_DUE_SELECTED_SF;
@@ -1094,7 +1094,7 @@ static enum session_action select_sf_by_ldbc(struct policy_enforcer *enforcer, s
}
else
{
ATOMIC_INC(&(g_metrics->sf_status.inactive));
ATOMIC_INC(&(thread_metrics->sf_status.inactive));
if (sff_param->sff_exception.fail_action == FAILURE_ACTION_RE_DISPATCH)
{

View File

@@ -175,7 +175,7 @@ struct sce_ctx *sce_ctx_create(const char *profile)
}
sce_ctx->ts = timestamp_new(sce_ctx->ts_update_interval_ms);
sce_ctx->metrics = global_metrics_create(profile);
sce_ctx->metrics = global_metrics_create(profile, sce_ctx->nr_worker_threads);
if (sce_ctx->metrics == NULL)
{
goto error_out;

View File

@@ -26,11 +26,11 @@ TEST(POLICY, SELECTED_CHAINING_LIFE_CYCLE)
TEST(POLICY, POLICY_ENFORCER_LIFE_CYCLE)
{
struct global_metrics g_metrics;
struct global_metrics global_metrics;
struct thread_ctx t_ctx;
struct session_ctx s_ctx;
t_ctx.ref_metrics = &g_metrics;
t_ctx.ref_global_metrics = &global_metrics;
s_ctx.ref_thread_ctx = &t_ctx;
s_ctx.session_id = 1;
s_ctx.session_addr = (char *)"1.1.1.1 11 2.2.2.2 22";

View File

@@ -97,7 +97,7 @@ inline struct gtest_frame *gtest_frame_new(const char *json_file, const char *de
thread_ctx->session_table = session_table_create();
thread_ctx->sf_metrics = sf_metrics_create(profile);
thread_ctx->ref_io = sce_ctx->io;
thread_ctx->ref_metrics = sce_ctx->metrics;
thread_ctx->ref_global_metrics = sce_ctx->metrics;
thread_ctx->ref_enforcer = sce_ctx->enforcer;
thread_ctx->ref_sce_ctx = sce_ctx;
thread_ctx->session_table_need_reset = 0;
@@ -169,9 +169,11 @@ inline void gtest_frame_log(struct gtest_frame *instance)
char diffile[1024] = {0};
char cmdline[1024] = {0};
struct sce_ctx *sce_ctx = instance->sce_ctx;
struct thread_ctx *thread_ctx = &sce_ctx->work_threads[0];
for (int i = 0; i < 10; i++)
{
global_metrics_update(sce_ctx->metrics, &thread_ctx->thread_metrics, thread_ctx->thread_index);
global_metrics_dump(sce_ctx->metrics);
usleep(1);
}