#include #include #include #include #include "log.h" #include "global_metrics.h" enum SCE_STAT_FIELD { // dev endpoint STAT_ENDPOINT_RX_PKT, STAT_ENDPOINT_RX_B, STAT_ENDPOINT_TX_PKT, STAT_ENDPOINT_TX_B, STAT_ENDPOINT_ERR_DROP_PKT, STAT_ENDPOINT_ERR_DROP_B, // hit block policy STAT_HIT_BLOCK_POLICY_PKT, STAT_HIT_BLOCK_POLICY_B, // dev nf interface STAT_RAW_PKT_RX_PKT, STAT_RAW_PKT_RX_B, STAT_RAW_PKT_TX_PKT, STAT_RAW_PKT_TX_B, STAT_RAW_PKT_ERR_BYPASS_PKT, STAT_RAW_PKT_ERR_BYPASS_B, // hit bypass policy STAT_HIT_BYPASS_POLICY_PKT, STAT_HIT_BYPASS_POLICY_B, // steering STAT_STEERING_TX_PKT, STAT_STEERING_TX_B, STAT_STEERING_RX_PKT, STAT_STEERING_RX_B, // mirroring STAT_MIRRORING_TX_PKT, STAT_MIRRORING_TX_B, STAT_MIRRORING_RX_DROP_PKT, STAT_MIRRORING_RX_DROP_B, // control packet STAT_CONTROL_RX_PKT, STAT_CONTROL_RX_B, STAT_CTRL_PKT_OPENING, STAT_CTRL_PKT_ACTIVE, STAT_CTRL_PKT_CLOSING, STAT_CTRL_PKT_RESETALL, STAT_CTRL_PKT_ERROR, // current session number STAT_CURRENT_SESSION_NUMS, // keepalive packet STAT_DOWNLINK_KEEPALIVE_RX_PKT, STAT_DOWNLINK_KEEPALIVE_RX_B, STAT_UPLINK_KEEPALIVE_RX_PKT, STAT_UPLINK_KEEPALIVE_RX_B, // health check STAT_SF_ACTIVE_TIMES, STAT_SF_INACTIVE_TIMES, // send log STAT_SEND_LOG, // max STAT_MAX, }; static const char *stat_map[] = { // dev endpoint [STAT_ENDPOINT_RX_PKT] = "endp_rx_pkt", [STAT_ENDPOINT_RX_B] = "endp_rx_B", [STAT_ENDPOINT_TX_PKT] = "endp_tx_pkt", [STAT_ENDPOINT_TX_B] = "endp_tx_B", [STAT_ENDPOINT_ERR_DROP_PKT] = "endp_edrop_pkt", [STAT_ENDPOINT_ERR_DROP_B] = "endp_edrop_B", // hit block policy [STAT_HIT_BLOCK_POLICY_PKT] = "hit_block_pkt", [STAT_HIT_BLOCK_POLICY_B] = "hit_block_B", // dev nf interface [STAT_RAW_PKT_RX_PKT] = "raw_rx_pkt", [STAT_RAW_PKT_RX_B] = "raw_rx_B", [STAT_RAW_PKT_TX_PKT] = "raw_tx_pkt", [STAT_RAW_PKT_TX_B] = "raw_tx_B", [STAT_RAW_PKT_ERR_BYPASS_PKT] = "raw_ebypass_pkt", [STAT_RAW_PKT_ERR_BYPASS_B] = "raw_ebypass_B", // hit bypass policy [STAT_HIT_BYPASS_POLICY_PKT] = "hit_bypass_pkt", [STAT_HIT_BYPASS_POLICY_B] = "hit_bypass_B", // steering [STAT_STEERING_TX_PKT] = "stee_tx_pkt", [STAT_STEERING_TX_B] = "stee_tx_B", [STAT_STEERING_RX_PKT] = "stee_rx_pkt", [STAT_STEERING_RX_B] = "stee_rx_B", // mirroring [STAT_MIRRORING_TX_PKT] = "mirr_tx_pkt", [STAT_MIRRORING_TX_B] = "mirr_tx_B", [STAT_MIRRORING_RX_DROP_PKT] = "mirr_rx_dop_pkt", [STAT_MIRRORING_RX_DROP_B] = "mirr_rx_dop_B", // control packet [STAT_CONTROL_RX_PKT] = "ctrl_rx_pkt", [STAT_CONTROL_RX_B] = "ctrl_rx_B", [STAT_CTRL_PKT_OPENING] = "ctrl_pkt_open", [STAT_CTRL_PKT_ACTIVE] = "ctrl_pkt_avtive", [STAT_CTRL_PKT_CLOSING] = "ctrl_pkt_close", [STAT_CTRL_PKT_RESETALL] = "ctrl_pkt_reset", [STAT_CTRL_PKT_ERROR] = "ctrl_pkt_error", // current session number [STAT_CURRENT_SESSION_NUMS] = "curr_sess_num", // keepalive packet [STAT_DOWNLINK_KEEPALIVE_RX_PKT] = "dlnk_kep_rx_pkt", [STAT_DOWNLINK_KEEPALIVE_RX_B] = "dlnk_kep_rx_B", [STAT_UPLINK_KEEPALIVE_RX_PKT] = "ulnk_kep_rx_pkt", [STAT_UPLINK_KEEPALIVE_RX_B] = "ulnk_kep_rx_B", // health check [STAT_SF_ACTIVE_TIMES] = "sf_active", [STAT_SF_INACTIVE_TIMES] = "sf_inactive", // send log [STAT_SEND_LOG] = "send_log", [STAT_MAX] = NULL}; static void global_metrics_parse_config(const char *profile, struct global_metrics_config *config) { MESA_load_profile_string_def(profile, "STAT", "output_file", config->output_file, sizeof(config->output_file), "log/sce.fs2"); MESA_load_profile_string_def(profile, "STAT", "statsd_server", config->statsd_server, sizeof(config->statsd_server), "127.0.0.1"); MESA_load_profile_int_def(profile, "STAT", "statsd_port", &(config->statsd_port), 8100); MESA_load_profile_int_def(profile, "STAT", "statsd_format", &(config->statsd_format), 1); // FS_OUTPUT_STATSD=1, FS_OUTPUT_INFLUX_LINE=2 MESA_load_profile_int_def(profile, "STAT", "statsd_cycle", &(config->statsd_cycle), 1); MESA_load_profile_int_def(profile, "STAT", "prometheus_listen_port", &(config->prometheus_listen_port), 9001); MESA_load_profile_string_def(profile, "STAT", "prometheus_listen_url", config->prometheus_listen_url, sizeof(config->prometheus_listen_url), "/sce_prometheus"); if (config->statsd_format != 1 && config->statsd_format != 2) { config->statsd_format = 1; } LOG_DEBUG("%s: STAT->output_file : %s", LOG_TAG_METRICS, config->output_file); LOG_DEBUG("%s: STAT->statsd_server : %s", LOG_TAG_METRICS, config->statsd_server); LOG_DEBUG("%s: STAT->statsd_port : %d", LOG_TAG_METRICS, config->statsd_port); LOG_DEBUG("%s: STAT->statsd_format : %d", LOG_TAG_METRICS, config->statsd_format); LOG_DEBUG("%s: STAT->statsd_cycle : %d", LOG_TAG_METRICS, config->statsd_cycle); LOG_DEBUG("%s: STAT->prometheus_listen_port : %d", LOG_TAG_METRICS, config->prometheus_listen_port); LOG_DEBUG("%s: STAT->prometheus_listen_url : %s", LOG_TAG_METRICS, config->prometheus_listen_url); } struct global_metrics *global_metrics_create(const char *profile) { struct global_metrics *metrics = (struct global_metrics *)calloc(1, sizeof(struct global_metrics)); assert(metrics != NULL); global_metrics_parse_config(profile, &metrics->config); FS_library_set_prometheus_port(metrics->config.prometheus_listen_port); FS_library_set_prometheus_url_path(metrics->config.prometheus_listen_url); FS_library_init(); int value = 0; metrics->fs_handle = FS_create_handle(); // TODO memleak no free() API FS_set_para(metrics->fs_handle, APP_NAME, "SCE", 3); FS_set_para(metrics->fs_handle, OUTPUT_DEVICE, metrics->config.output_file, strlen(metrics->config.output_file)); value = 1; FS_set_para(metrics->fs_handle, OUTPUT_PROMETHEUS, &value, sizeof(value)); value = 1; FS_set_para(metrics->fs_handle, PRINT_MODE, &value, sizeof(value)); value = 0; FS_set_para(metrics->fs_handle, CREATE_THREAD, &value, sizeof(value)); if (strlen(metrics->config.statsd_server) > 0 && metrics->config.statsd_port != 0) { FS_set_para(metrics->fs_handle, STATS_SERVER_IP, metrics->config.statsd_server, strlen(metrics->config.statsd_server)); FS_set_para(metrics->fs_handle, STATS_SERVER_PORT, &(metrics->config.statsd_port), sizeof(metrics->config.statsd_port)); FS_set_para(metrics->fs_handle, STATS_FORMAT, &metrics->config.statsd_format, sizeof(metrics->config.statsd_format)); } if (STAT_MAX >= (sizeof(metrics->fs_id) / sizeof(metrics->fs_id[0]))) { LOG_ERROR("%s: field stat has insufficient space to store fs_id, and supports a maximum of %lu fsids, but %d is needed ", LOG_TAG_METRICS, (sizeof(metrics->fs_id) / sizeof(metrics->fs_id[0])), STAT_MAX); global_metrics_destory(metrics); return NULL; } for (int i = 0; i < STAT_MAX; i++) { metrics->fs_id[i] = FS_register(metrics->fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, stat_map[i]); } FS_start(metrics->fs_handle); return metrics; } void global_metrics_destory(struct global_metrics *metrics) { if (metrics) { FS_library_destroy(); free(metrics); metrics = NULL; } } void global_metrics_dump(struct global_metrics *metrics) { // dev endpoint FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_rx.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_rx.n_bytes), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_tx.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_tx.n_bytes), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_ERR_DROP_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_err_drop.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_ENDPOINT_ERR_DROP_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->dev_endpoint_err_drop.n_bytes), 0, __ATOMIC_RELAXED)); // dev nf interface FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->raw_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->raw_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->raw_pkt_tx.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->raw_pkt_tx.n_bytes), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_ERR_BYPASS_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->raw_pkt_err_bypass.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_RAW_PKT_ERR_BYPASS_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->raw_pkt_err_bypass.n_bytes), 0, __ATOMIC_RELAXED)); // hit block policy FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BLOCK_POLICY_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_block_policy.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BLOCK_POLICY_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_block_policy.n_bytes), 0, __ATOMIC_RELAXED)); // hit bypass policy FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BYPASS_POLICY_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_bypass_policy.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_HIT_BYPASS_POLICY_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->hit_bypass_policy.n_bytes), 0, __ATOMIC_RELAXED)); // steering FS_operate(metrics->fs_handle, metrics->fs_id[STAT_STEERING_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->steering_tx.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_STEERING_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->steering_tx.n_bytes), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_STEERING_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->steering_rx.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_STEERING_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->steering_rx.n_bytes), 0, __ATOMIC_RELAXED)); // mirroring FS_operate(metrics->fs_handle, metrics->fs_id[STAT_MIRRORING_TX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->mirroring_tx.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_MIRRORING_TX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->mirroring_tx.n_bytes), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_MIRRORING_RX_DROP_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->mirroring_rx_drop.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_MIRRORING_RX_DROP_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->mirroring_rx_drop.n_bytes), 0, __ATOMIC_RELAXED)); // keepalive packet FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DOWNLINK_KEEPALIVE_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->downlink_keepalive_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DOWNLINK_KEEPALIVE_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->downlink_keepalive_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_UPLINK_KEEPALIVE_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->uplink_keepalive_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_UPLINK_KEEPALIVE_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->uplink_keepalive_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); // control packet FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_pkts), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_rx.n_bytes), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_OPENING], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_opening_num), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_ACTIVE], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_active_num), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_CLOSING], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_closing_num), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_RESETALL], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_resetall_num), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CTRL_PKT_ERROR], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->ctrl_pkt_error_num), 0, __ATOMIC_RELAXED)); // current session number FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CURRENT_SESSION_NUMS], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->session_nums), 0, __ATOMIC_RELAXED)); // health check FS_operate(metrics->fs_handle, metrics->fs_id[STAT_SF_ACTIVE_TIMES], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->sf_active_times), 0, __ATOMIC_RELAXED)); FS_operate(metrics->fs_handle, metrics->fs_id[STAT_SF_INACTIVE_TIMES], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->sf_inactive_times), 0, __ATOMIC_RELAXED)); // send log FS_operate(metrics->fs_handle, metrics->fs_id[STAT_SEND_LOG], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->send_log), 0, __ATOMIC_RELAXED)); FS_passive_output(metrics->fs_handle); }