From c8d40f1347937612143daf6eceac697f57150ad1 Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Fri, 19 Jul 2024 10:02:07 +0800 Subject: [PATCH] feature: TSG-21852 service_function_status support fieldstat4 --- common/src/kafka.cpp | 6 + common/test/gtest_health_check_table.cpp | 42 ++-- conf/sce.conf | 5 - platform/include/health_check.h | 4 +- platform/include/sf_status.h | 20 +- platform/src/health_check.cpp | 61 +++--- platform/src/main.cpp | 5 +- platform/src/policy.cpp | 26 ++- platform/src/sce.cpp | 2 + platform/src/sf_metrics.cpp | 2 +- platform/src/sf_status.cpp | 232 ++++++++++------------- platform/test/gtest_sf_status.cpp | 34 +++- platform/test/test_resource/sce.conf | 5 - test/gtest_utils.h | 2 - test/test_data/conf/sce.conf | 5 - 15 files changed, 222 insertions(+), 229 deletions(-) diff --git a/common/src/kafka.cpp b/common/src/kafka.cpp index 6fd93a4..c934015 100644 --- a/common/src/kafka.cpp +++ b/common/src/kafka.cpp @@ -216,6 +216,12 @@ void kafka_destroy(struct kafka *handle) int kafka_send(struct kafka *handle, enum topic_idx idx, const char *data, int len) { + if (!handle) + { + LOG_ERROR("%s: handle is NULL", LOG_TAG_KAFKA); + return -1; + } + if (idx < 0 || idx >= MAX_TOPIC_NUM) { LOG_ERROR("%s: invalid topic index: %d", LOG_TAG_KAFKA, idx); diff --git a/common/test/gtest_health_check_table.cpp b/common/test/gtest_health_check_table.cpp index d3ae50d..f48892d 100644 --- a/common/test/gtest_health_check_table.cpp +++ b/common/test/gtest_health_check_table.cpp @@ -13,7 +13,7 @@ TEST(HEALTH_CHECK_TABLE, INSERT) uint64_t session_id4 = 0; uint64_t session_id5 = 0; // TEST Create - health_check_session_init("./sce.conf"); + health_check_session_init("./sce.conf", NULL); // TEST Insert struct health_check policy1; @@ -43,11 +43,11 @@ TEST(HEALTH_CHECK_TABLE, INSERT) EXPECT_TRUE((session_id5 = health_check_session_add(5, 0, &policy3)) > 0); // TEST Delete By Session ID - EXPECT_TRUE(health_check_session_del(session_id1, 1) == 0); - EXPECT_TRUE(health_check_session_del(session_id2, 2) == 0); - EXPECT_TRUE(health_check_session_del(session_id3, 3) == 0); - EXPECT_TRUE(health_check_session_del(session_id4, 4) == 0); - EXPECT_TRUE(health_check_session_del(session_id5, 5) == 0); + EXPECT_TRUE(health_check_session_del(session_id1, 1, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id2, 2, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id3, 3, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id4, 4, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id5, 5, 0) == 0); } TEST(HEALTH_CHECK_TABLE, GET_STATUS) @@ -88,9 +88,9 @@ TEST(HEALTH_CHECK_TABLE, GET_STATUS) EXPECT_TRUE(health_check_session_get_status(7) == -1); // TEST Delete By Session ID - EXPECT_TRUE(health_check_session_del(session_id1, 1) == 0); - EXPECT_TRUE(health_check_session_del(session_id2, 2) == 0); - EXPECT_TRUE(health_check_session_del(session_id3, 3) == 0); + EXPECT_TRUE(health_check_session_del(session_id1, 1, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id2, 2, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id3, 3, 0) == 0); } TEST(HEALTH_CHECK_TABLE, SET_STATUS) @@ -141,9 +141,9 @@ TEST(HEALTH_CHECK_TABLE, SET_STATUS) EXPECT_TRUE(health_check_session_get_status(session_id3) == 1); // TEST Delete By Session ID - EXPECT_TRUE(health_check_session_del(session_id1, 1) == 0); - EXPECT_TRUE(health_check_session_del(session_id2, 2) == 0); - EXPECT_TRUE(health_check_session_del(session_id3, 3) == 0); + EXPECT_TRUE(health_check_session_del(session_id1, 1, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id2, 2, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id3, 3, 0) == 0); } #if 0 @@ -185,15 +185,15 @@ TEST(HEALTH_CHECK_TABLE, DELETE) EXPECT_TRUE((session_id6 = health_check_session_add(6, 0, &policy3)) > 0); // TEST Delete By Session ID - EXPECT_TRUE(health_check_session_del(session_id1, 1) == 0); - EXPECT_TRUE(health_check_session_del(session_id1, 1) == -1); - EXPECT_TRUE(health_check_session_del(session_id2, 2) == 0); - EXPECT_TRUE(health_check_session_del(session_id3, 3) == 0); - EXPECT_TRUE(health_check_session_del(session_id3, 3) == -1); - EXPECT_TRUE(health_check_session_del(session_id5, 4) == 0); - EXPECT_TRUE(health_check_session_del(session_id5, 5) == 0); - EXPECT_TRUE(health_check_session_del(session_id6, 6) == 0); - EXPECT_TRUE(health_check_session_del(session_id6, 6) == -1); + EXPECT_TRUE(health_check_session_del(session_id1, 1, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id1, 1, 0) == -1); + EXPECT_TRUE(health_check_session_del(session_id2, 2, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id3, 3, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id3, 3, 0) == -1); + EXPECT_TRUE(health_check_session_del(session_id5, 4, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id5, 5, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id6, 6, 0) == 0); + EXPECT_TRUE(health_check_session_del(session_id6, 6, 0) == -1); } #endif diff --git a/conf/sce.conf b/conf/sce.conf index 6fc6e46..ddb585d 100644 --- a/conf/sce.conf +++ b/conf/sce.conf @@ -71,11 +71,6 @@ prometheus_listen_port=9001 prometheus_listen_url=/sce_prometheus [metrics] -# Kafka Topic: POLICY-RULE-METRIC -enable=1 -interval_s=1 -telegraf_bind_address=127.0.0.1 -telegraf_listen_port=8300 output_fs_interval_ms=500 output_kafka_interval_ms=1000 data_center=center-xxg-tsgx diff --git a/platform/include/health_check.h b/platform/include/health_check.h index 9e4d7cd..ea0f6f9 100644 --- a/platform/include/health_check.h +++ b/platform/include/health_check.h @@ -8,7 +8,7 @@ extern "C" #include "policy.h" -void health_check_session_init(const char *profile); +void health_check_session_init(const char *profile, struct kafka *kfk); // return 0 : success // return -1 : key exist @@ -17,7 +17,7 @@ uint64_t health_check_session_add(int profile_id, int vsys_id, const struct heal // return 0 : success // return -1 : key not exist -int health_check_session_del(uint64_t session_id, int profile_id); +int health_check_session_del(uint64_t session_id, int profile_id, int vsys_id); // return 1 : active // return 0 : inactive diff --git a/platform/include/sf_status.h b/platform/include/sf_status.h index f5541f1..90f42b8 100644 --- a/platform/include/sf_status.h +++ b/platform/include/sf_status.h @@ -6,17 +6,21 @@ extern "C" { #endif +#include "kafka.h" #include -#include "uthash.h" -struct sf_status *sf_status_create(const char *profile); +struct sf_status_key +{ + uint32_t vsys_id; + uint32_t sf_profile_id; +}; + +struct sf_status *sf_status_create(const char *profile, struct kafka *kfk); void sf_status_destory(struct sf_status *handle); -void sf_status_reset(struct sf_status *handle); - -void sf_status_delete(struct sf_status *handle, int sf_profile_id); -void sf_status_update(struct sf_status *handle, int sf_vsys_id, int sf_profile_id, int sf_status, int sf_latency); -void sf_status_send(struct sf_status *handle); -int sf_status_get_interval(struct sf_status *handle); +void sf_status_delete(struct sf_status *handle, const struct sf_status_key *key); +void sf_status_update(struct sf_status *handle, const struct sf_status_key *key, int sf_status, int sf_latency); +void sf_status_output(struct sf_status *handle); +int sf_status_get_ouput_interval_ms(struct sf_status *handle); #ifdef __cplusplus } diff --git a/platform/src/health_check.cpp b/platform/src/health_check.cpp index 726e852..abe3730 100644 --- a/platform/src/health_check.cpp +++ b/platform/src/health_check.cpp @@ -31,6 +31,7 @@ #define PACKET_SIZE 64 #define HC_DEV_NAME_LEN 16 #define HC_LOCAL_ADDRESS_LEN 64 +#define TIMESPEC_TO_MSEC(ts) ((ts).tv_sec * 1000 + (ts).tv_nsec / 1000000) struct session_table { @@ -74,7 +75,7 @@ static struct session_table_addr g_handle_bfd; static struct session_table_addr g_handle_none; static struct sf_status *g_sf_status = NULL; -int sleep_ms = 300; +int next_check_wait_ms = 300; int enable = 1; int icmp_cycle_time_s = 10; char path[BFD_PATHLEN]; @@ -335,7 +336,7 @@ static int send_icmp_cycle() return pid; } -void health_check_session_init(const char *profile) +void health_check_session_init(const char *profile, struct kafka *kfk) { char default_gw_mac_str[32] = { 0 }; memset(&g_handle, 0, sizeof(g_handle)); @@ -358,7 +359,7 @@ void health_check_session_init(const char *profile) return; } - g_sf_status = sf_status_create(profile); + g_sf_status = sf_status_create(profile, kfk); if (strlen(gateway_address) > 0) { health_check_method_table_add(&g_handle_none, gateway_address); } @@ -503,7 +504,7 @@ uint64_t health_check_session_add(int profile_id, int vsys_id, const struct heal // return 0 : success // return -1 : key not exist -int health_check_session_del(uint64_t session_id, int profile_id) +int health_check_session_del(uint64_t session_id, int profile_id, int vsys_id) { int ret = 0; struct session_iterm *tmp = NULL; @@ -533,7 +534,10 @@ int health_check_session_del(uint64_t session_id, int profile_id) end: HASH_DELETE(hh1, g_handle.root_by_id, tmp); - sf_status_delete(g_sf_status, profile_id); + struct sf_status_key key = {0}; + key.vsys_id = vsys_id; + key.sf_profile_id = profile_id; + sf_status_delete(g_sf_status, &key); pthread_rwlock_unlock(&g_handle.rwlock); free(tmp); tmp = NULL; @@ -623,7 +627,7 @@ static int get_mac_by_addr(char *addr, uint8_t *buf) static void *_health_check_session_foreach(void *arg) { int is_active = 0; - int interval_s = sf_status_get_interval(g_sf_status); + int ouput_interval_ms = sf_status_get_ouput_interval_ms(g_sf_status); struct bfd_vtysh_client client; struct session_iterm *tmp = NULL; struct session_iterm *node = NULL; @@ -632,10 +636,10 @@ static void *_health_check_session_foreach(void *arg) struct sockaddr_in addr; struct timespec current_time; - struct timespec g_status_last_send_time; + struct timespec last_output_time; clock_gettime(CLOCK_MONOTONIC, ¤t_time); - clock_gettime(CLOCK_MONOTONIC, &g_status_last_send_time); + last_output_time = current_time; health_check_session_init_bfd_client(&client); bfd_vtysh_connect(&client); @@ -661,7 +665,10 @@ static void *_health_check_session_foreach(void *arg) is_active = 0; } - sf_status_update(g_sf_status, node->vsys_id, node->profile_id, is_active, 0); + struct sf_status_key key = {0}; + key.vsys_id = node->vsys_id; + key.sf_profile_id = node->profile_id; + sf_status_update(g_sf_status, &key, is_active, 0); if (node->is_active != is_active) { node->is_active = is_active; if (node->is_active == 1) { @@ -673,41 +680,31 @@ static void *_health_check_session_foreach(void *arg) health_check_method_table_set_mac(&g_handle_bfd, node->policy.address, init_mac); } } - if (sleep_ms > node->policy.interval_ms) - sleep_ms = node->policy.interval_ms; + if (next_check_wait_ms > node->policy.interval_ms) + next_check_wait_ms = node->policy.interval_ms; } pthread_rwlock_unlock(&g_handle.rwlock); clock_gettime(CLOCK_MONOTONIC, ¤t_time); - if (current_time.tv_sec - g_status_last_send_time.tv_sec >= interval_s) + int next_output_wait_ms = ouput_interval_ms - (TIMESPEC_TO_MSEC(current_time) - TIMESPEC_TO_MSEC(last_output_time)); + if (next_output_wait_ms <= 0) { - sf_status_send(g_sf_status); - clock_gettime(CLOCK_MONOTONIC, &g_status_last_send_time); + next_output_wait_ms = 0; } - // interval_s : 1000 ms - // sleep_ms : 900 ms - if (interval_s * 1000 > sleep_ms) + if (next_output_wait_ms >= next_check_wait_ms) { - usleep(sleep_ms * 1000); + usleep(next_check_wait_ms * 1000); } - // interval_s : 900 ms - // sleep_ms : 1000 ms else { - int tmp_time = sleep_ms; - while(tmp_time > interval_s * 1000) { - usleep(interval_s * 1000 * 1000); + usleep(next_output_wait_ms * 1000); - clock_gettime(CLOCK_MONOTONIC, ¤t_time); - if (current_time.tv_sec - g_status_last_send_time.tv_sec >= interval_s) - { - sf_status_send(g_sf_status); - clock_gettime(CLOCK_MONOTONIC, &g_status_last_send_time); - } - tmp_time -= interval_s * 1000; - } - usleep(tmp_time * 1000); + clock_gettime(CLOCK_MONOTONIC, ¤t_time); + sf_status_output(g_sf_status); + last_output_time = current_time; + + usleep((next_check_wait_ms - next_output_wait_ms) * 1000); } } bfd_vtysh_close(&client); diff --git a/platform/src/main.cpp b/platform/src/main.cpp index 6cf813e..1f58b91 100644 --- a/platform/src/main.cpp +++ b/platform/src/main.cpp @@ -9,7 +9,6 @@ #include "log.h" #include "utils.h" #include "sf_metrics.h" -#include "health_check.h" #include "global_metrics.h" struct breakpad_instance *g_breakpad = NULL; @@ -75,7 +74,7 @@ static void *worker_thread_cycle(void *arg) int n_packet_recved = 0; char thread_name[16]; uint64_t sf_metrics_last_send_ts = timestamp_get_msec(ts); - uint64_t sf_metrics_send_interval = sf_metrics_get_interval(sf_metrics) * 1000; + uint64_t sf_metrics_send_interval = sf_metrics_get_interval(sf_metrics); ATOMIC_SET(&thread_ctx->thread_is_runing, 1); snprintf(thread_name, sizeof(thread_name), "sce:worker-%d", thread_index); @@ -183,8 +182,6 @@ int main(int argc, char **argv) g_breakpad = breakpad_init(profile, "system", g_default_logger, __sce_version); - health_check_session_init(profile); - struct sce_ctx *ctx = sce_ctx_create(profile); if (ctx == NULL) { diff --git a/platform/src/policy.cpp b/platform/src/policy.cpp index bb6cf79..7fc12af 100644 --- a/platform/src/policy.cpp +++ b/platform/src/policy.cpp @@ -976,7 +976,7 @@ static void sf_param_free_cb(int table_id, void **ad, long argl, void *argp) { if (param->sf_connectivity.method != ENCAPSULATE_METHOD_LAYER2_SWITCH) { - health_check_session_del(param->health_check_session_id, param->sf_profile_id); + health_check_session_del(param->health_check_session_id, param->sf_profile_id, param->sf_vsys_id); } LOG_INFO("%s: Del sf profile: %d", LOG_TAG_POLICY, param->sf_profile_id); free(param); @@ -1296,16 +1296,24 @@ const char *action_desc_tostring(enum action_desc action_desc) switch (action_desc) { // success action - case ACTION_FORWAED_DUE_SELECTED_SF: return "forward"; + case ACTION_FORWAED_DUE_SELECTED_SF: + return "forward"; // failure action - case ACTION_BYPASS_DUE_FAILURE_ACTION: return "bypass"; - case ACTION_BLOCK_DUE_FAILURE_ACTION: return "block"; - case ACTION_BLOCK_DUE_UNAVAILABLE_ACTION: return "re-dispatch block"; - case ACTION_BYPASS_DUE_UNAVAILABLE_ACTION: return "re-dispatch bypass"; - case ACTION_BYPASS_DUE_HEALTH_SF_LIMIT: return "re-dispatch bypass(health SF limit)"; + case ACTION_BYPASS_DUE_FAILURE_ACTION: + return "bypass"; + case ACTION_BLOCK_DUE_FAILURE_ACTION: + return "block"; + case ACTION_BLOCK_DUE_UNAVAILABLE_ACTION: + return "re-dispatch block"; + case ACTION_BYPASS_DUE_UNAVAILABLE_ACTION: + return "re-dispatch bypass"; + case ACTION_BYPASS_DUE_HEALTH_SF_LIMIT: + return "re-dispatch bypass(health SF limit)"; // default action - case ACTION_BYPASS_DUE_DEFAULT: return "bypass(default)"; - case ACTION_BYPASS_DUE_INVALID_POLICY: return "bypass(invalid policy)"; + case ACTION_BYPASS_DUE_DEFAULT: + return "bypass(default)"; + case ACTION_BYPASS_DUE_INVALID_POLICY: + return "bypass(invalid policy)"; // unreachable default: return "action unknown"; diff --git a/platform/src/sce.cpp b/platform/src/sce.cpp index fe4ba89..fab0db8 100644 --- a/platform/src/sce.cpp +++ b/platform/src/sce.cpp @@ -3,6 +3,7 @@ #include "sce.h" #include "log.h" +#include "health_check.h" char *memdup(const char *src, int len) { @@ -91,6 +92,7 @@ struct sce_ctx *sce_ctx_create(const char *profile) { goto error_out; } + health_check_session_init(profile, sce_ctx->kfk); sce_ctx->ts = timestamp_new(sce_ctx->ts_update_interval_ms); sce_ctx->sf_metrics = sf_metrics_create(profile, sce_ctx->kfk); diff --git a/platform/src/sf_metrics.cpp b/platform/src/sf_metrics.cpp index 90d3200..e56d05d 100644 --- a/platform/src/sf_metrics.cpp +++ b/platform/src/sf_metrics.cpp @@ -94,7 +94,7 @@ static void *fs2kafka_thread_cycle(void *arg) free(ptr); } - usleep(handle->cfg.output_fs_interval_ms * 1000); + usleep(handle->cfg.output_kafka_interval_ms * 1000); } ATOMIC_SET(&handle->thr_is_runing, 0); diff --git a/platform/src/sf_status.cpp b/platform/src/sf_status.cpp index 6ba84c1..c91cc2f 100644 --- a/platform/src/sf_status.cpp +++ b/platform/src/sf_status.cpp @@ -4,66 +4,89 @@ #include #include #include +#include #include "log.h" #include "utils.h" #include "sf_status.h" +#include "uthash.h" -#define SCE_SF_STATUS "service_function_status,vsys_id=%d,sf_profile_id=%d sf_status=%d,sf_latency_us=%d" - -struct node +struct metric { - int sf_vsys_id; - int sf_profile_id; + struct sf_status_key key; + int sf_status; int sf_latency; UT_hash_handle hh; }; -struct sf_status_config +struct config { - int enable; - int interval_s; - int telegraf_listen_port; - char telegraf_bind_address[2048]; + int output_kafka_interval_ms; + char data_center[256]; + char device_group[256]; + char device_id[256]; }; struct sf_status { - struct sf_status_config config; - struct sockaddr_in sock_addr; - int sockfd; + struct config cfg; - struct node *htable; - uint64_t htable_elem_count; + int sf_status_idx; + int sf_latency_idx; + + struct kafka *kfk; + struct fieldstat_easy *fs; + struct metric *htable; }; -static void sf_status_parse_config(const char *profile, struct sf_status_config *config) -{ - MESA_load_profile_int_def(profile, "METRICS", "enable", &(config->enable), 1); - MESA_load_profile_int_def(profile, "METRICS", "interval_s", &(config->interval_s), 1); - MESA_load_profile_int_def(profile, "METRICS", "telegraf_listen_port", &(config->telegraf_listen_port), 8300); - MESA_load_profile_string_def(profile, "METRICS", "telegraf_bind_address", config->telegraf_bind_address, sizeof(config->telegraf_bind_address), "127.0.0.1"); +/****************************************************************************** + * Public API + ******************************************************************************/ - LOG_DEBUG("%s: METRICS->enable : %d", LOG_TAG_SFSTATUS, config->enable); - LOG_DEBUG("%s: METRICS->interval_s : %d", LOG_TAG_SFSTATUS, config->interval_s); - LOG_DEBUG("%s: METRICS->telegraf_listen_port : %d", LOG_TAG_SFSTATUS, config->telegraf_listen_port); - LOG_DEBUG("%s: METRICS->telegraf_bind_address : %s", LOG_TAG_SFSTATUS, config->telegraf_bind_address); +struct sf_status *sf_status_create(const char *profile, struct kafka *kfk) +{ + struct sf_status *handle = (struct sf_status *)calloc(1, sizeof(struct sf_status)); + if (!handle) + { + return NULL; + } + + MESA_load_profile_int_def(profile, "metrics", "output_kafka_interval_ms", &(handle->cfg.output_kafka_interval_ms), 1000); + MESA_load_profile_string_def(profile, "metrics", "data_center", handle->cfg.data_center, sizeof(handle->cfg.data_center), ""); + MESA_load_profile_string_def(profile, "metrics", "device_group", handle->cfg.device_group, sizeof(handle->cfg.device_group), ""); + MESA_load_profile_string_def(profile, "metrics", "device_id", handle->cfg.device_id, sizeof(handle->cfg.device_id), ""); + + const struct fieldstat_tag tags[] = { + {"data_center", TAG_CSTRING, {.value_str = handle->cfg.data_center}}, + {"device_group", TAG_CSTRING, {.value_str = handle->cfg.device_group}}, + {"device_id", TAG_CSTRING, {.value_str = handle->cfg.device_id}}, + }; + + handle->kfk = kfk; + handle->fs = fieldstat_easy_new(1, "service_function_status", tags, sizeof(tags) / sizeof(tags[0])); + if (!handle->fs) + { + goto error_out; + } + + handle->sf_status_idx = fieldstat_easy_register_counter(handle->fs, "sf_status"); + handle->sf_latency_idx = fieldstat_easy_register_counter(handle->fs, "sf_latency_us"); + + return handle; + +error_out: + sf_status_destory(handle); + return NULL; } void sf_status_destory(struct sf_status *handle) { if (handle) { - if (handle->sockfd) - { - close(handle->sockfd); - handle->sockfd = -1; - } - - struct node *temp = NULL; - struct node *node = NULL; + struct metric *temp = NULL; + struct metric *node = NULL; HASH_ITER(hh, handle->htable, node, temp) { HASH_DELETE(hh, handle->htable, node); @@ -71,150 +94,99 @@ void sf_status_destory(struct sf_status *handle) node = NULL; } - handle->htable_elem_count = 0; + if (handle->fs) + { + fieldstat_easy_free(handle->fs); + handle->fs = NULL; + } + free(handle); handle = NULL; } } -struct sf_status *sf_status_create(const char *profile) +void sf_status_delete(struct sf_status *handle, const struct sf_status_key *key) { - struct sf_status *handle = (struct sf_status *)calloc(1, sizeof(struct sf_status)); - assert(handle); - sf_status_parse_config(profile, &(handle->config)); - - if (handle->config.enable == 0) - { - return handle; - } - - handle->sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - handle->sock_addr.sin_family = AF_INET; - handle->sock_addr.sin_port = htons(handle->config.telegraf_listen_port); - handle->sock_addr.sin_addr.s_addr = inet_addr(handle->config.telegraf_bind_address); - handle->htable_elem_count = 0; - if (handle->sockfd == -1) - { - LOG_ERROR("%s: failed to create udp sockfd %s:%d, errno: %d, %s", LOG_TAG_SFSTATUS, handle->config.telegraf_bind_address, handle->config.telegraf_listen_port, errno, strerror(errno)); - sf_status_destory(handle); - return NULL; - } - - return handle; -} - -void sf_status_reset(struct sf_status *handle) -{ - if (handle == NULL || handle->config.enable == 0) + if (!handle) { return; } - LOG_DEBUG("%s: reset: elem_num %lu", LOG_TAG_SFSTATUS, handle->htable_elem_count); - - struct node *temp = NULL; - struct node *node = NULL; - HASH_ITER(hh, handle->htable, node, temp) - { - HASH_DELETE(hh, handle->htable, node); - - free(node); - node = NULL; - handle->htable_elem_count--; - } -} - -void sf_status_delete(struct sf_status *handle, int sf_profile_id) -{ - if (handle == NULL || handle->config.enable == 0) - { - return; - } - - struct node *temp = NULL; - HASH_FIND(hh, handle->htable, &sf_profile_id, sizeof(sf_profile_id), temp); + struct metric *temp = NULL; + HASH_FIND(hh, handle->htable, key, sizeof(struct sf_status_key), temp); if (temp) { - handle->htable_elem_count--; - LOG_DEBUG("%s: delete: sf_profile %d success, elem_num %lu", LOG_TAG_SFSTATUS, sf_profile_id, handle->htable_elem_count); HASH_DELETE(hh, handle->htable, temp); free(temp); temp = NULL; } - else - { - LOG_DEBUG("%s: delete: sf_profile %d not exists, elem_num %lu", LOG_TAG_SFSTATUS, sf_profile_id, handle->htable_elem_count); - } } -void sf_status_update(struct sf_status *handle, int sf_vsys_id, int sf_profile_id, int sf_status, int sf_latency) +void sf_status_update(struct sf_status *handle, const struct sf_status_key *key, int sf_status, int sf_latency) { - if (handle == NULL || handle->config.enable == 0) + if (!handle) { return; } - struct node *temp = NULL; - HASH_FIND(hh, handle->htable, &sf_profile_id, sizeof(sf_profile_id), temp); + struct metric *temp = NULL; + HASH_FIND(hh, handle->htable, key, sizeof(struct sf_status_key), temp); if (temp) { - if (temp->sf_status != sf_status) - { - LOG_DEBUG("%s: update: sf_profile %d status %d success, elem_num %lu", LOG_TAG_SFSTATUS, sf_profile_id, sf_status, handle->htable_elem_count); - } - temp->sf_vsys_id = sf_vsys_id; - temp->sf_profile_id = sf_profile_id; temp->sf_status = sf_status; temp->sf_latency = sf_latency; } else { - handle->htable_elem_count++; - LOG_DEBUG("%s: insert: sf_profile %d status %d success, elem_num %lu", LOG_TAG_SFSTATUS, sf_profile_id, sf_status, handle->htable_elem_count); - temp = (struct node *)calloc(1, sizeof(struct node)); - temp->sf_vsys_id = sf_vsys_id; - temp->sf_profile_id = sf_profile_id; + temp = (struct metric *)calloc(1, sizeof(struct metric)); + temp->key.vsys_id = key->vsys_id; + temp->key.sf_profile_id = key->sf_profile_id; temp->sf_status = sf_status; temp->sf_latency = sf_latency; - - HASH_ADD(hh, handle->htable, sf_profile_id, sizeof(sf_profile_id), temp); + HASH_ADD(hh, handle->htable, key, sizeof(struct sf_status_key), temp); } } -void sf_status_send(struct sf_status *handle) +void sf_status_output(struct sf_status *handle) { - char buff[2048]; - int nsend = 0; - int size = sizeof(buff); - - struct node *temp = NULL; - struct node *node = NULL; - - if (handle == NULL || handle->config.enable == 0) + if (!handle) { return; } + struct metric *temp = NULL; + struct metric *node = NULL; HASH_ITER(hh, handle->htable, node, temp) { - memset(buff, 0, size); - nsend = snprintf(buff, size, SCE_SF_STATUS, - node->sf_vsys_id, - node->sf_profile_id, - node->sf_status, - node->sf_latency); - sendto(handle->sockfd, buff, nsend, 0, (struct sockaddr *)&handle->sock_addr, sizeof(handle->sock_addr)); + const struct fieldstat_tag tags[] = { + {"vsys_id", TAG_INTEGER, {.value_longlong = node->key.vsys_id}}, + {"sf_profile_id", TAG_INTEGER, {.value_longlong = node->key.sf_profile_id}}, + }; + + fieldstat_easy_counter_set(handle->fs, 0, handle->sf_status_idx, tags, sizeof(tags) / sizeof(tags[0]), node->sf_status); + fieldstat_easy_counter_set(handle->fs, 0, handle->sf_latency_idx, tags, sizeof(tags) / sizeof(tags[0]), node->sf_latency); + } + + char **ptr = NULL; + size_t len = 0; + fieldstat_easy_output_array_and_reset(handle->fs, &ptr, &len); + if (ptr) + { + for (size_t i = 0; i < len; i++) + { + kafka_send(handle->kfk, TOPIC_RULE_HITS, ptr[i], strlen(ptr[i])); + free(ptr[i]); + ptr[i] = NULL; + } + free(ptr); } } -int sf_status_get_interval(struct sf_status *handle) +int sf_status_get_ouput_interval_ms(struct sf_status *handle) { - if (handle == NULL) + if (!handle) { return 0; } - else - { - return handle->config.interval_s; - } + return handle->cfg.output_kafka_interval_ms; } \ No newline at end of file diff --git a/platform/test/gtest_sf_status.cpp b/platform/test/gtest_sf_status.cpp index d34ee8a..8f1cfa7 100644 --- a/platform/test/gtest_sf_status.cpp +++ b/platform/test/gtest_sf_status.cpp @@ -4,12 +4,36 @@ TEST(SF_STATUS, TEST) { - struct sf_status *status = sf_status_create("./test_resource/sce.conf"); - EXPECT_TRUE(sf_status_get_interval(status) == 1); - sf_status_update(status, 11, 1, 0, 0); - sf_status_update(status, 22, 2, 1, 1); - sf_status_send(status); + struct kafka *kfk = kafka_create("./test_resource/sce.conf"); + EXPECT_TRUE(kfk != NULL); + struct sf_status *status = sf_status_create("./test_resource/sce.conf", kfk); + EXPECT_TRUE(status != NULL); + + EXPECT_TRUE(sf_status_get_ouput_interval_ms(status) == 1000); + + struct sf_status_key key1 = {0}; + key1.vsys_id = 11; + key1.sf_profile_id = 12; + + struct sf_status_key key2 = {0}; + key2.vsys_id = 21; + key2.sf_profile_id = 22; + + sf_status_update(status, &key1, 1, 2); + sf_status_update(status, &key2, 2, 1); + printf("\n========================================\n expect key1 + key2 \n========================================\n"); + sf_status_output(status); + + sf_status_delete(status, &key1); + printf("\n========================================\n expect only key2 \n========================================\n"); + sf_status_output(status); + + sf_status_delete(status, &key2); + printf("\n========================================\n expect no output \n========================================\n"); + sf_status_output(status); + sf_status_destory(status); + kafka_destroy(kfk); } int main(int argc, char **argv) diff --git a/platform/test/test_resource/sce.conf b/platform/test/test_resource/sce.conf index 04360ee..b0e6c93 100644 --- a/platform/test/test_resource/sce.conf +++ b/platform/test/test_resource/sce.conf @@ -23,11 +23,6 @@ redis_server=127.0.0.1 redis_port_range=6379 [metrics] -# Kafka Topic: POLICY-RULE-METRIC -enable=1 -interval_s=1 -telegraf_bind_address=127.0.0.1 -telegraf_listen_port=8300 output_fs_interval_ms=500 output_kafka_interval_ms=1000 data_center=center-xxg-tsgx diff --git a/test/gtest_utils.h b/test/gtest_utils.h index 44ce420..25852bb 100644 --- a/test/gtest_utils.h +++ b/test/gtest_utils.h @@ -15,7 +15,6 @@ extern "C" #include "vxlan.h" #include "packet_io.h" #include "sf_metrics.h" -#include "health_check.h" #include "global_metrics.h" #include "gmock_marsio.h" @@ -87,7 +86,6 @@ inline struct gtest_frame *gtest_frame_new(const char *json_file, const char *de system(cmdline); EXPECT_TRUE(LOG_INIT("./conf/zlog.conf") == 0); - health_check_session_init(profile); sce_ctx = sce_ctx_create(profile); EXPECT_TRUE(sce_ctx != nullptr); diff --git a/test/test_data/conf/sce.conf b/test/test_data/conf/sce.conf index 2ad588b..83c7c1c 100644 --- a/test/test_data/conf/sce.conf +++ b/test/test_data/conf/sce.conf @@ -70,11 +70,6 @@ prometheus_listen_port=9001 prometheus_listen_url=/sce_prometheus [metrics] -# Kafka Topic: POLICY-RULE-METRIC -enable=0 -interval_s=1 -telegraf_bind_address=127.0.0.1 -telegraf_listen_port=8300 output_fs_interval_ms=500 output_kafka_interval_ms=1000 data_center=center-xxg-tsgx