feature: TSG-21852 service_function_status support fieldstat4

This commit is contained in:
luwenpeng
2024-07-19 10:02:07 +08:00
parent 9e63902c0d
commit c8d40f1347
15 changed files with 222 additions and 229 deletions

View File

@@ -216,6 +216,12 @@ void kafka_destroy(struct kafka *handle)
int kafka_send(struct kafka *handle, enum topic_idx idx, const char *data, int len)
{
if (!handle)
{
LOG_ERROR("%s: handle is NULL", LOG_TAG_KAFKA);
return -1;
}
if (idx < 0 || idx >= MAX_TOPIC_NUM)
{
LOG_ERROR("%s: invalid topic index: %d", LOG_TAG_KAFKA, idx);

View File

@@ -13,7 +13,7 @@ TEST(HEALTH_CHECK_TABLE, INSERT)
uint64_t session_id4 = 0;
uint64_t session_id5 = 0;
// TEST Create
health_check_session_init("./sce.conf");
health_check_session_init("./sce.conf", NULL);
// TEST Insert
struct health_check policy1;
@@ -43,11 +43,11 @@ TEST(HEALTH_CHECK_TABLE, INSERT)
EXPECT_TRUE((session_id5 = health_check_session_add(5, 0, &policy3)) > 0);
// TEST Delete By Session ID
EXPECT_TRUE(health_check_session_del(session_id1, 1) == 0);
EXPECT_TRUE(health_check_session_del(session_id2, 2) == 0);
EXPECT_TRUE(health_check_session_del(session_id3, 3) == 0);
EXPECT_TRUE(health_check_session_del(session_id4, 4) == 0);
EXPECT_TRUE(health_check_session_del(session_id5, 5) == 0);
EXPECT_TRUE(health_check_session_del(session_id1, 1, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id2, 2, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id3, 3, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id4, 4, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id5, 5, 0) == 0);
}
TEST(HEALTH_CHECK_TABLE, GET_STATUS)
@@ -88,9 +88,9 @@ TEST(HEALTH_CHECK_TABLE, GET_STATUS)
EXPECT_TRUE(health_check_session_get_status(7) == -1);
// TEST Delete By Session ID
EXPECT_TRUE(health_check_session_del(session_id1, 1) == 0);
EXPECT_TRUE(health_check_session_del(session_id2, 2) == 0);
EXPECT_TRUE(health_check_session_del(session_id3, 3) == 0);
EXPECT_TRUE(health_check_session_del(session_id1, 1, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id2, 2, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id3, 3, 0) == 0);
}
TEST(HEALTH_CHECK_TABLE, SET_STATUS)
@@ -141,9 +141,9 @@ TEST(HEALTH_CHECK_TABLE, SET_STATUS)
EXPECT_TRUE(health_check_session_get_status(session_id3) == 1);
// TEST Delete By Session ID
EXPECT_TRUE(health_check_session_del(session_id1, 1) == 0);
EXPECT_TRUE(health_check_session_del(session_id2, 2) == 0);
EXPECT_TRUE(health_check_session_del(session_id3, 3) == 0);
EXPECT_TRUE(health_check_session_del(session_id1, 1, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id2, 2, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id3, 3, 0) == 0);
}
#if 0
@@ -185,15 +185,15 @@ TEST(HEALTH_CHECK_TABLE, DELETE)
EXPECT_TRUE((session_id6 = health_check_session_add(6, 0, &policy3)) > 0);
// TEST Delete By Session ID
EXPECT_TRUE(health_check_session_del(session_id1, 1) == 0);
EXPECT_TRUE(health_check_session_del(session_id1, 1) == -1);
EXPECT_TRUE(health_check_session_del(session_id2, 2) == 0);
EXPECT_TRUE(health_check_session_del(session_id3, 3) == 0);
EXPECT_TRUE(health_check_session_del(session_id3, 3) == -1);
EXPECT_TRUE(health_check_session_del(session_id5, 4) == 0);
EXPECT_TRUE(health_check_session_del(session_id5, 5) == 0);
EXPECT_TRUE(health_check_session_del(session_id6, 6) == 0);
EXPECT_TRUE(health_check_session_del(session_id6, 6) == -1);
EXPECT_TRUE(health_check_session_del(session_id1, 1, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id1, 1, 0) == -1);
EXPECT_TRUE(health_check_session_del(session_id2, 2, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id3, 3, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id3, 3, 0) == -1);
EXPECT_TRUE(health_check_session_del(session_id5, 4, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id5, 5, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id6, 6, 0) == 0);
EXPECT_TRUE(health_check_session_del(session_id6, 6, 0) == -1);
}
#endif

View File

@@ -71,11 +71,6 @@ prometheus_listen_port=9001
prometheus_listen_url=/sce_prometheus
[metrics]
# Kafka Topic: POLICY-RULE-METRIC
enable=1
interval_s=1
telegraf_bind_address=127.0.0.1
telegraf_listen_port=8300
output_fs_interval_ms=500
output_kafka_interval_ms=1000
data_center=center-xxg-tsgx

View File

@@ -8,7 +8,7 @@ extern "C"
#include "policy.h"
void health_check_session_init(const char *profile);
void health_check_session_init(const char *profile, struct kafka *kfk);
// return 0 : success
// return -1 : key exist
@@ -17,7 +17,7 @@ uint64_t health_check_session_add(int profile_id, int vsys_id, const struct heal
// return 0 : success
// return -1 : key not exist
int health_check_session_del(uint64_t session_id, int profile_id);
int health_check_session_del(uint64_t session_id, int profile_id, int vsys_id);
// return 1 : active
// return 0 : inactive

View File

@@ -6,17 +6,21 @@ extern "C"
{
#endif
#include "kafka.h"
#include <stdint.h>
#include "uthash.h"
struct sf_status *sf_status_create(const char *profile);
struct sf_status_key
{
uint32_t vsys_id;
uint32_t sf_profile_id;
};
struct sf_status *sf_status_create(const char *profile, struct kafka *kfk);
void sf_status_destory(struct sf_status *handle);
void sf_status_reset(struct sf_status *handle);
void sf_status_delete(struct sf_status *handle, int sf_profile_id);
void sf_status_update(struct sf_status *handle, int sf_vsys_id, int sf_profile_id, int sf_status, int sf_latency);
void sf_status_send(struct sf_status *handle);
int sf_status_get_interval(struct sf_status *handle);
void sf_status_delete(struct sf_status *handle, const struct sf_status_key *key);
void sf_status_update(struct sf_status *handle, const struct sf_status_key *key, int sf_status, int sf_latency);
void sf_status_output(struct sf_status *handle);
int sf_status_get_ouput_interval_ms(struct sf_status *handle);
#ifdef __cplusplus
}

View File

@@ -31,6 +31,7 @@
#define PACKET_SIZE 64
#define HC_DEV_NAME_LEN 16
#define HC_LOCAL_ADDRESS_LEN 64
#define TIMESPEC_TO_MSEC(ts) ((ts).tv_sec * 1000 + (ts).tv_nsec / 1000000)
struct session_table
{
@@ -74,7 +75,7 @@ static struct session_table_addr g_handle_bfd;
static struct session_table_addr g_handle_none;
static struct sf_status *g_sf_status = NULL;
int sleep_ms = 300;
int next_check_wait_ms = 300;
int enable = 1;
int icmp_cycle_time_s = 10;
char path[BFD_PATHLEN];
@@ -335,7 +336,7 @@ static int send_icmp_cycle()
return pid;
}
void health_check_session_init(const char *profile)
void health_check_session_init(const char *profile, struct kafka *kfk)
{
char default_gw_mac_str[32] = { 0 };
memset(&g_handle, 0, sizeof(g_handle));
@@ -358,7 +359,7 @@ void health_check_session_init(const char *profile)
return;
}
g_sf_status = sf_status_create(profile);
g_sf_status = sf_status_create(profile, kfk);
if (strlen(gateway_address) > 0) {
health_check_method_table_add(&g_handle_none, gateway_address);
}
@@ -503,7 +504,7 @@ uint64_t health_check_session_add(int profile_id, int vsys_id, const struct heal
// return 0 : success
// return -1 : key not exist
int health_check_session_del(uint64_t session_id, int profile_id)
int health_check_session_del(uint64_t session_id, int profile_id, int vsys_id)
{
int ret = 0;
struct session_iterm *tmp = NULL;
@@ -533,7 +534,10 @@ int health_check_session_del(uint64_t session_id, int profile_id)
end:
HASH_DELETE(hh1, g_handle.root_by_id, tmp);
sf_status_delete(g_sf_status, profile_id);
struct sf_status_key key = {0};
key.vsys_id = vsys_id;
key.sf_profile_id = profile_id;
sf_status_delete(g_sf_status, &key);
pthread_rwlock_unlock(&g_handle.rwlock);
free(tmp);
tmp = NULL;
@@ -623,7 +627,7 @@ static int get_mac_by_addr(char *addr, uint8_t *buf)
static void *_health_check_session_foreach(void *arg)
{
int is_active = 0;
int interval_s = sf_status_get_interval(g_sf_status);
int ouput_interval_ms = sf_status_get_ouput_interval_ms(g_sf_status);
struct bfd_vtysh_client client;
struct session_iterm *tmp = NULL;
struct session_iterm *node = NULL;
@@ -632,10 +636,10 @@ static void *_health_check_session_foreach(void *arg)
struct sockaddr_in addr;
struct timespec current_time;
struct timespec g_status_last_send_time;
struct timespec last_output_time;
clock_gettime(CLOCK_MONOTONIC, &current_time);
clock_gettime(CLOCK_MONOTONIC, &g_status_last_send_time);
last_output_time = current_time;
health_check_session_init_bfd_client(&client);
bfd_vtysh_connect(&client);
@@ -661,7 +665,10 @@ static void *_health_check_session_foreach(void *arg)
is_active = 0;
}
sf_status_update(g_sf_status, node->vsys_id, node->profile_id, is_active, 0);
struct sf_status_key key = {0};
key.vsys_id = node->vsys_id;
key.sf_profile_id = node->profile_id;
sf_status_update(g_sf_status, &key, is_active, 0);
if (node->is_active != is_active) {
node->is_active = is_active;
if (node->is_active == 1) {
@@ -673,41 +680,31 @@ static void *_health_check_session_foreach(void *arg)
health_check_method_table_set_mac(&g_handle_bfd, node->policy.address, init_mac);
}
}
if (sleep_ms > node->policy.interval_ms)
sleep_ms = node->policy.interval_ms;
if (next_check_wait_ms > node->policy.interval_ms)
next_check_wait_ms = node->policy.interval_ms;
}
pthread_rwlock_unlock(&g_handle.rwlock);
clock_gettime(CLOCK_MONOTONIC, &current_time);
if (current_time.tv_sec - g_status_last_send_time.tv_sec >= interval_s)
int next_output_wait_ms = ouput_interval_ms - (TIMESPEC_TO_MSEC(current_time) - TIMESPEC_TO_MSEC(last_output_time));
if (next_output_wait_ms <= 0)
{
sf_status_send(g_sf_status);
clock_gettime(CLOCK_MONOTONIC, &g_status_last_send_time);
next_output_wait_ms = 0;
}
// interval_s : 1000 ms
// sleep_ms : 900 ms
if (interval_s * 1000 > sleep_ms)
if (next_output_wait_ms >= next_check_wait_ms)
{
usleep(sleep_ms * 1000);
usleep(next_check_wait_ms * 1000);
}
// interval_s : 900 ms
// sleep_ms : 1000 ms
else
{
int tmp_time = sleep_ms;
while(tmp_time > interval_s * 1000) {
usleep(interval_s * 1000 * 1000);
usleep(next_output_wait_ms * 1000);
clock_gettime(CLOCK_MONOTONIC, &current_time);
if (current_time.tv_sec - g_status_last_send_time.tv_sec >= interval_s)
{
sf_status_send(g_sf_status);
clock_gettime(CLOCK_MONOTONIC, &g_status_last_send_time);
}
tmp_time -= interval_s * 1000;
}
usleep(tmp_time * 1000);
clock_gettime(CLOCK_MONOTONIC, &current_time);
sf_status_output(g_sf_status);
last_output_time = current_time;
usleep((next_check_wait_ms - next_output_wait_ms) * 1000);
}
}
bfd_vtysh_close(&client);

View File

@@ -9,7 +9,6 @@
#include "log.h"
#include "utils.h"
#include "sf_metrics.h"
#include "health_check.h"
#include "global_metrics.h"
struct breakpad_instance *g_breakpad = NULL;
@@ -75,7 +74,7 @@ static void *worker_thread_cycle(void *arg)
int n_packet_recved = 0;
char thread_name[16];
uint64_t sf_metrics_last_send_ts = timestamp_get_msec(ts);
uint64_t sf_metrics_send_interval = sf_metrics_get_interval(sf_metrics) * 1000;
uint64_t sf_metrics_send_interval = sf_metrics_get_interval(sf_metrics);
ATOMIC_SET(&thread_ctx->thread_is_runing, 1);
snprintf(thread_name, sizeof(thread_name), "sce:worker-%d", thread_index);
@@ -183,8 +182,6 @@ int main(int argc, char **argv)
g_breakpad = breakpad_init(profile, "system", g_default_logger, __sce_version);
health_check_session_init(profile);
struct sce_ctx *ctx = sce_ctx_create(profile);
if (ctx == NULL)
{

View File

@@ -976,7 +976,7 @@ static void sf_param_free_cb(int table_id, void **ad, long argl, void *argp)
{
if (param->sf_connectivity.method != ENCAPSULATE_METHOD_LAYER2_SWITCH)
{
health_check_session_del(param->health_check_session_id, param->sf_profile_id);
health_check_session_del(param->health_check_session_id, param->sf_profile_id, param->sf_vsys_id);
}
LOG_INFO("%s: Del sf profile: %d", LOG_TAG_POLICY, param->sf_profile_id);
free(param);
@@ -1296,16 +1296,24 @@ const char *action_desc_tostring(enum action_desc action_desc)
switch (action_desc)
{
// success action
case ACTION_FORWAED_DUE_SELECTED_SF: return "forward";
case ACTION_FORWAED_DUE_SELECTED_SF:
return "forward";
// failure action
case ACTION_BYPASS_DUE_FAILURE_ACTION: return "bypass";
case ACTION_BLOCK_DUE_FAILURE_ACTION: return "block";
case ACTION_BLOCK_DUE_UNAVAILABLE_ACTION: return "re-dispatch block";
case ACTION_BYPASS_DUE_UNAVAILABLE_ACTION: return "re-dispatch bypass";
case ACTION_BYPASS_DUE_HEALTH_SF_LIMIT: return "re-dispatch bypass(health SF limit)";
case ACTION_BYPASS_DUE_FAILURE_ACTION:
return "bypass";
case ACTION_BLOCK_DUE_FAILURE_ACTION:
return "block";
case ACTION_BLOCK_DUE_UNAVAILABLE_ACTION:
return "re-dispatch block";
case ACTION_BYPASS_DUE_UNAVAILABLE_ACTION:
return "re-dispatch bypass";
case ACTION_BYPASS_DUE_HEALTH_SF_LIMIT:
return "re-dispatch bypass(health SF limit)";
// default action
case ACTION_BYPASS_DUE_DEFAULT: return "bypass(default)";
case ACTION_BYPASS_DUE_INVALID_POLICY: return "bypass(invalid policy)";
case ACTION_BYPASS_DUE_DEFAULT:
return "bypass(default)";
case ACTION_BYPASS_DUE_INVALID_POLICY:
return "bypass(invalid policy)";
// unreachable
default:
return "action unknown";

View File

@@ -3,6 +3,7 @@
#include "sce.h"
#include "log.h"
#include "health_check.h"
char *memdup(const char *src, int len)
{
@@ -91,6 +92,7 @@ struct sce_ctx *sce_ctx_create(const char *profile)
{
goto error_out;
}
health_check_session_init(profile, sce_ctx->kfk);
sce_ctx->ts = timestamp_new(sce_ctx->ts_update_interval_ms);
sce_ctx->sf_metrics = sf_metrics_create(profile, sce_ctx->kfk);

View File

@@ -94,7 +94,7 @@ static void *fs2kafka_thread_cycle(void *arg)
free(ptr);
}
usleep(handle->cfg.output_fs_interval_ms * 1000);
usleep(handle->cfg.output_kafka_interval_ms * 1000);
}
ATOMIC_SET(&handle->thr_is_runing, 0);

View File

@@ -4,66 +4,89 @@
#include <arpa/inet.h>
#include <sys/socket.h>
#include <MESA/MESA_prof_load.h>
#include <fieldstat/fieldstat_easy.h>
#include "log.h"
#include "utils.h"
#include "sf_status.h"
#include "uthash.h"
#define SCE_SF_STATUS "service_function_status,vsys_id=%d,sf_profile_id=%d sf_status=%d,sf_latency_us=%d"
struct node
struct metric
{
int sf_vsys_id;
int sf_profile_id;
struct sf_status_key key;
int sf_status;
int sf_latency;
UT_hash_handle hh;
};
struct sf_status_config
struct config
{
int enable;
int interval_s;
int telegraf_listen_port;
char telegraf_bind_address[2048];
int output_kafka_interval_ms;
char data_center[256];
char device_group[256];
char device_id[256];
};
struct sf_status
{
struct sf_status_config config;
struct sockaddr_in sock_addr;
int sockfd;
struct config cfg;
struct node *htable;
uint64_t htable_elem_count;
int sf_status_idx;
int sf_latency_idx;
struct kafka *kfk;
struct fieldstat_easy *fs;
struct metric *htable;
};
static void sf_status_parse_config(const char *profile, struct sf_status_config *config)
{
MESA_load_profile_int_def(profile, "METRICS", "enable", &(config->enable), 1);
MESA_load_profile_int_def(profile, "METRICS", "interval_s", &(config->interval_s), 1);
MESA_load_profile_int_def(profile, "METRICS", "telegraf_listen_port", &(config->telegraf_listen_port), 8300);
MESA_load_profile_string_def(profile, "METRICS", "telegraf_bind_address", config->telegraf_bind_address, sizeof(config->telegraf_bind_address), "127.0.0.1");
/******************************************************************************
* Public API
******************************************************************************/
LOG_DEBUG("%s: METRICS->enable : %d", LOG_TAG_SFSTATUS, config->enable);
LOG_DEBUG("%s: METRICS->interval_s : %d", LOG_TAG_SFSTATUS, config->interval_s);
LOG_DEBUG("%s: METRICS->telegraf_listen_port : %d", LOG_TAG_SFSTATUS, config->telegraf_listen_port);
LOG_DEBUG("%s: METRICS->telegraf_bind_address : %s", LOG_TAG_SFSTATUS, config->telegraf_bind_address);
struct sf_status *sf_status_create(const char *profile, struct kafka *kfk)
{
struct sf_status *handle = (struct sf_status *)calloc(1, sizeof(struct sf_status));
if (!handle)
{
return NULL;
}
MESA_load_profile_int_def(profile, "metrics", "output_kafka_interval_ms", &(handle->cfg.output_kafka_interval_ms), 1000);
MESA_load_profile_string_def(profile, "metrics", "data_center", handle->cfg.data_center, sizeof(handle->cfg.data_center), "");
MESA_load_profile_string_def(profile, "metrics", "device_group", handle->cfg.device_group, sizeof(handle->cfg.device_group), "");
MESA_load_profile_string_def(profile, "metrics", "device_id", handle->cfg.device_id, sizeof(handle->cfg.device_id), "");
const struct fieldstat_tag tags[] = {
{"data_center", TAG_CSTRING, {.value_str = handle->cfg.data_center}},
{"device_group", TAG_CSTRING, {.value_str = handle->cfg.device_group}},
{"device_id", TAG_CSTRING, {.value_str = handle->cfg.device_id}},
};
handle->kfk = kfk;
handle->fs = fieldstat_easy_new(1, "service_function_status", tags, sizeof(tags) / sizeof(tags[0]));
if (!handle->fs)
{
goto error_out;
}
handle->sf_status_idx = fieldstat_easy_register_counter(handle->fs, "sf_status");
handle->sf_latency_idx = fieldstat_easy_register_counter(handle->fs, "sf_latency_us");
return handle;
error_out:
sf_status_destory(handle);
return NULL;
}
void sf_status_destory(struct sf_status *handle)
{
if (handle)
{
if (handle->sockfd)
{
close(handle->sockfd);
handle->sockfd = -1;
}
struct node *temp = NULL;
struct node *node = NULL;
struct metric *temp = NULL;
struct metric *node = NULL;
HASH_ITER(hh, handle->htable, node, temp)
{
HASH_DELETE(hh, handle->htable, node);
@@ -71,150 +94,99 @@ void sf_status_destory(struct sf_status *handle)
node = NULL;
}
handle->htable_elem_count = 0;
if (handle->fs)
{
fieldstat_easy_free(handle->fs);
handle->fs = NULL;
}
free(handle);
handle = NULL;
}
}
struct sf_status *sf_status_create(const char *profile)
void sf_status_delete(struct sf_status *handle, const struct sf_status_key *key)
{
struct sf_status *handle = (struct sf_status *)calloc(1, sizeof(struct sf_status));
assert(handle);
sf_status_parse_config(profile, &(handle->config));
if (handle->config.enable == 0)
{
return handle;
}
handle->sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
handle->sock_addr.sin_family = AF_INET;
handle->sock_addr.sin_port = htons(handle->config.telegraf_listen_port);
handle->sock_addr.sin_addr.s_addr = inet_addr(handle->config.telegraf_bind_address);
handle->htable_elem_count = 0;
if (handle->sockfd == -1)
{
LOG_ERROR("%s: failed to create udp sockfd %s:%d, errno: %d, %s", LOG_TAG_SFSTATUS, handle->config.telegraf_bind_address, handle->config.telegraf_listen_port, errno, strerror(errno));
sf_status_destory(handle);
return NULL;
}
return handle;
}
void sf_status_reset(struct sf_status *handle)
{
if (handle == NULL || handle->config.enable == 0)
if (!handle)
{
return;
}
LOG_DEBUG("%s: reset: elem_num %lu", LOG_TAG_SFSTATUS, handle->htable_elem_count);
struct node *temp = NULL;
struct node *node = NULL;
HASH_ITER(hh, handle->htable, node, temp)
{
HASH_DELETE(hh, handle->htable, node);
free(node);
node = NULL;
handle->htable_elem_count--;
}
}
void sf_status_delete(struct sf_status *handle, int sf_profile_id)
{
if (handle == NULL || handle->config.enable == 0)
{
return;
}
struct node *temp = NULL;
HASH_FIND(hh, handle->htable, &sf_profile_id, sizeof(sf_profile_id), temp);
struct metric *temp = NULL;
HASH_FIND(hh, handle->htable, key, sizeof(struct sf_status_key), temp);
if (temp)
{
handle->htable_elem_count--;
LOG_DEBUG("%s: delete: sf_profile %d success, elem_num %lu", LOG_TAG_SFSTATUS, sf_profile_id, handle->htable_elem_count);
HASH_DELETE(hh, handle->htable, temp);
free(temp);
temp = NULL;
}
else
{
LOG_DEBUG("%s: delete: sf_profile %d not exists, elem_num %lu", LOG_TAG_SFSTATUS, sf_profile_id, handle->htable_elem_count);
}
}
void sf_status_update(struct sf_status *handle, int sf_vsys_id, int sf_profile_id, int sf_status, int sf_latency)
void sf_status_update(struct sf_status *handle, const struct sf_status_key *key, int sf_status, int sf_latency)
{
if (handle == NULL || handle->config.enable == 0)
if (!handle)
{
return;
}
struct node *temp = NULL;
HASH_FIND(hh, handle->htable, &sf_profile_id, sizeof(sf_profile_id), temp);
struct metric *temp = NULL;
HASH_FIND(hh, handle->htable, key, sizeof(struct sf_status_key), temp);
if (temp)
{
if (temp->sf_status != sf_status)
{
LOG_DEBUG("%s: update: sf_profile %d status %d success, elem_num %lu", LOG_TAG_SFSTATUS, sf_profile_id, sf_status, handle->htable_elem_count);
}
temp->sf_vsys_id = sf_vsys_id;
temp->sf_profile_id = sf_profile_id;
temp->sf_status = sf_status;
temp->sf_latency = sf_latency;
}
else
{
handle->htable_elem_count++;
LOG_DEBUG("%s: insert: sf_profile %d status %d success, elem_num %lu", LOG_TAG_SFSTATUS, sf_profile_id, sf_status, handle->htable_elem_count);
temp = (struct node *)calloc(1, sizeof(struct node));
temp->sf_vsys_id = sf_vsys_id;
temp->sf_profile_id = sf_profile_id;
temp = (struct metric *)calloc(1, sizeof(struct metric));
temp->key.vsys_id = key->vsys_id;
temp->key.sf_profile_id = key->sf_profile_id;
temp->sf_status = sf_status;
temp->sf_latency = sf_latency;
HASH_ADD(hh, handle->htable, sf_profile_id, sizeof(sf_profile_id), temp);
HASH_ADD(hh, handle->htable, key, sizeof(struct sf_status_key), temp);
}
}
void sf_status_send(struct sf_status *handle)
void sf_status_output(struct sf_status *handle)
{
char buff[2048];
int nsend = 0;
int size = sizeof(buff);
struct node *temp = NULL;
struct node *node = NULL;
if (handle == NULL || handle->config.enable == 0)
if (!handle)
{
return;
}
struct metric *temp = NULL;
struct metric *node = NULL;
HASH_ITER(hh, handle->htable, node, temp)
{
memset(buff, 0, size);
nsend = snprintf(buff, size, SCE_SF_STATUS,
node->sf_vsys_id,
node->sf_profile_id,
node->sf_status,
node->sf_latency);
sendto(handle->sockfd, buff, nsend, 0, (struct sockaddr *)&handle->sock_addr, sizeof(handle->sock_addr));
const struct fieldstat_tag tags[] = {
{"vsys_id", TAG_INTEGER, {.value_longlong = node->key.vsys_id}},
{"sf_profile_id", TAG_INTEGER, {.value_longlong = node->key.sf_profile_id}},
};
fieldstat_easy_counter_set(handle->fs, 0, handle->sf_status_idx, tags, sizeof(tags) / sizeof(tags[0]), node->sf_status);
fieldstat_easy_counter_set(handle->fs, 0, handle->sf_latency_idx, tags, sizeof(tags) / sizeof(tags[0]), node->sf_latency);
}
char **ptr = NULL;
size_t len = 0;
fieldstat_easy_output_array_and_reset(handle->fs, &ptr, &len);
if (ptr)
{
for (size_t i = 0; i < len; i++)
{
kafka_send(handle->kfk, TOPIC_RULE_HITS, ptr[i], strlen(ptr[i]));
free(ptr[i]);
ptr[i] = NULL;
}
free(ptr);
}
}
int sf_status_get_interval(struct sf_status *handle)
int sf_status_get_ouput_interval_ms(struct sf_status *handle)
{
if (handle == NULL)
if (!handle)
{
return 0;
}
else
{
return handle->config.interval_s;
}
return handle->cfg.output_kafka_interval_ms;
}

View File

@@ -4,12 +4,36 @@
TEST(SF_STATUS, TEST)
{
struct sf_status *status = sf_status_create("./test_resource/sce.conf");
EXPECT_TRUE(sf_status_get_interval(status) == 1);
sf_status_update(status, 11, 1, 0, 0);
sf_status_update(status, 22, 2, 1, 1);
sf_status_send(status);
struct kafka *kfk = kafka_create("./test_resource/sce.conf");
EXPECT_TRUE(kfk != NULL);
struct sf_status *status = sf_status_create("./test_resource/sce.conf", kfk);
EXPECT_TRUE(status != NULL);
EXPECT_TRUE(sf_status_get_ouput_interval_ms(status) == 1000);
struct sf_status_key key1 = {0};
key1.vsys_id = 11;
key1.sf_profile_id = 12;
struct sf_status_key key2 = {0};
key2.vsys_id = 21;
key2.sf_profile_id = 22;
sf_status_update(status, &key1, 1, 2);
sf_status_update(status, &key2, 2, 1);
printf("\n========================================\n expect key1 + key2 \n========================================\n");
sf_status_output(status);
sf_status_delete(status, &key1);
printf("\n========================================\n expect only key2 \n========================================\n");
sf_status_output(status);
sf_status_delete(status, &key2);
printf("\n========================================\n expect no output \n========================================\n");
sf_status_output(status);
sf_status_destory(status);
kafka_destroy(kfk);
}
int main(int argc, char **argv)

View File

@@ -23,11 +23,6 @@ redis_server=127.0.0.1
redis_port_range=6379
[metrics]
# Kafka Topic: POLICY-RULE-METRIC
enable=1
interval_s=1
telegraf_bind_address=127.0.0.1
telegraf_listen_port=8300
output_fs_interval_ms=500
output_kafka_interval_ms=1000
data_center=center-xxg-tsgx

View File

@@ -15,7 +15,6 @@ extern "C"
#include "vxlan.h"
#include "packet_io.h"
#include "sf_metrics.h"
#include "health_check.h"
#include "global_metrics.h"
#include "gmock_marsio.h"
@@ -87,7 +86,6 @@ inline struct gtest_frame *gtest_frame_new(const char *json_file, const char *de
system(cmdline);
EXPECT_TRUE(LOG_INIT("./conf/zlog.conf") == 0);
health_check_session_init(profile);
sce_ctx = sce_ctx_create(profile);
EXPECT_TRUE(sce_ctx != nullptr);

View File

@@ -70,11 +70,6 @@ prometheus_listen_port=9001
prometheus_listen_url=/sce_prometheus
[metrics]
# Kafka Topic: POLICY-RULE-METRIC
enable=0
interval_s=1
telegraf_bind_address=127.0.0.1
telegraf_listen_port=8300
output_fs_interval_ms=500
output_kafka_interval_ms=1000
data_center=center-xxg-tsgx