diff --git a/common/src/kafka.cpp b/common/src/kafka.cpp index 2a06c92..6fd93a4 100644 --- a/common/src/kafka.cpp +++ b/common/src/kafka.cpp @@ -9,9 +9,6 @@ #define MAX_SYMBOL_LEN 128 -#define KAFKA_LOG_ERROR(fmt, ...) LOG_ERROR("%s: " fmt, LOG_TAG_KAFKA, ##__VA_ARGS__) -#define KAFKA_LOG_DEBUG(fmt, ...) LOG_DEBUG("%s: " fmt, LOG_TAG_KAFKA, ##__VA_ARGS__) - struct config { char brokerlist[MAX_SYMBOL_LEN]; @@ -69,44 +66,44 @@ static struct per_producer_per_topic *per_producer_per_topic_new(const char *bro rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (!conf) { - KAFKA_LOG_ERROR("failed to create kafka conf"); + LOG_ERROR("%s: failed to create kafka conf", LOG_TAG_KAFKA); goto error_out; } if (rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK) { - KAFKA_LOG_ERROR("failed to set kafka queue.buffering.max.messages, %s", err_str); + LOG_ERROR("%s: failed to set kafka queue.buffering.max.messages, %s", LOG_TAG_KAFKA, err_str); goto error_out; } if (rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000", err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK) { - KAFKA_LOG_ERROR("failed to set kafka topic.metadata.refresh.interval.ms, %s", err_str); + LOG_ERROR("%s: failed to set kafka topic.metadata.refresh.interval.ms, %s", LOG_TAG_KAFKA, err_str); goto error_out; } if (rd_kafka_conf_set(conf, "client.id", topic_name, err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK) { - KAFKA_LOG_ERROR("failed to set kafka client.id, %s", err_str); + LOG_ERROR("%s: failed to set kafka client.id, %s", LOG_TAG_KAFKA, err_str); goto error_out; } if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0) { if (rd_kafka_conf_set(conf, "security.protocol", "sasl_plaintext", err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK) { - KAFKA_LOG_ERROR("failed to set kafka security.protocol, %s", err_str); + LOG_ERROR("%s: failed to set kafka security.protocol, %s", LOG_TAG_KAFKA, err_str); goto error_out; } if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK) { - KAFKA_LOG_ERROR("failed to set kafka sasl.mechanisms, %s", err_str); + LOG_ERROR("%s: failed to set kafka sasl.mechanisms, %s", LOG_TAG_KAFKA, err_str); goto error_out; } if (rd_kafka_conf_set(conf, "sasl.username", sasl_username, err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK) { - KAFKA_LOG_ERROR("failed to set kafka sasl.username, %s", err_str); + LOG_ERROR("%s: failed to set kafka sasl.username, %s", LOG_TAG_KAFKA, err_str); goto error_out; } if (rd_kafka_conf_set(conf, "sasl.password", sasl_passwd, err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK) { - KAFKA_LOG_ERROR("failed to set kafka sasl.password, %s", err_str); + LOG_ERROR("%s: failed to set kafka sasl.password, %s", LOG_TAG_KAFKA, err_str); goto error_out; } } @@ -114,7 +111,7 @@ static struct per_producer_per_topic *per_producer_per_topic_new(const char *bro { if (rd_kafka_conf_set(conf, "security.protocol", "plaintext", err_str, sizeof(err_str)) != RD_KAFKA_CONF_OK) { - KAFKA_LOG_ERROR("failed to set kafka security.protocol, %s", err_str); + LOG_ERROR("%s: failed to set kafka security.protocol, %s", LOG_TAG_KAFKA, err_str); goto error_out; } } @@ -124,20 +121,20 @@ static struct per_producer_per_topic *per_producer_per_topic_new(const char *bro conf = NULL; if (pppt->producer == NULL) { - KAFKA_LOG_ERROR("failed to create kafka producer, %s", err_str); + LOG_ERROR("%s: failed to create kafka producer, %s", LOG_TAG_KAFKA, err_str); goto error_out; } if (rd_kafka_brokers_add(pppt->producer, brokerlist) == 0) { - KAFKA_LOG_ERROR("failed to add kafka brokers"); + LOG_ERROR("%s: failed to add kafka brokers", LOG_TAG_KAFKA); goto error_out; } pppt->topic = rd_kafka_topic_new(pppt->producer, topic_name, NULL); if (pppt->topic == NULL) { - KAFKA_LOG_ERROR("failed to create kafka topic: %s", topic_name); + LOG_ERROR("%s: failed to create kafka topic: %s", LOG_TAG_KAFKA, topic_name); goto error_out; } @@ -173,7 +170,7 @@ struct kafka *kafka_create(const char *profile) if (strlen(handle->cfg.brokerlist) == 0) { - KAFKA_LOG_ERROR("brokerlist is empty"); + LOG_ERROR("%s: brokerlist is empty", LOG_TAG_KAFKA); goto error_out; } @@ -181,7 +178,7 @@ struct kafka *kafka_create(const char *profile) { if (strlen(handle->cfg.topic_name[i]) == 0) { - KAFKA_LOG_ERROR("topic_name[%d] is empty", i); + LOG_ERROR("%s: topic_name[%d] is empty", LOG_TAG_KAFKA, i); goto error_out; } } @@ -217,11 +214,11 @@ void kafka_destroy(struct kafka *handle) } } -int kafka_send(struct kafka *handle, enum topic_idx idx, const void *data, int len) +int kafka_send(struct kafka *handle, enum topic_idx idx, const char *data, int len) { if (idx < 0 || idx >= MAX_TOPIC_NUM) { - KAFKA_LOG_ERROR("invalid topic index: %d", idx); + LOG_ERROR("%s: invalid topic index: %d", LOG_TAG_KAFKA, idx); return -1; } @@ -229,17 +226,18 @@ int kafka_send(struct kafka *handle, enum topic_idx idx, const void *data, int l { if (rd_kafka_produce(handle->pppt[idx]->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL) == -1) { - KAFKA_LOG_ERROR("failed to produce message with topic [%d], %s", idx, rd_kafka_err2str(rd_kafka_last_error())); + LOG_ERROR("%s: failed to produce message with topic [%d], %s", LOG_TAG_KAFKA, idx, rd_kafka_err2str(rd_kafka_last_error())); return -1; } else { + LOG_DEBUG("%s: success to produce message with topic [%d], %s", LOG_TAG_KAFKA, idx, data); return 0; } } else { - KAFKA_LOG_ERROR("topic %d not initialized", idx); + LOG_ERROR("%s: topic %d not initialized", LOG_TAG_KAFKA, idx); return -1; } } \ No newline at end of file diff --git a/conf/sce.conf b/conf/sce.conf index 91dd87f..6fc6e46 100644 --- a/conf/sce.conf +++ b/conf/sce.conf @@ -76,6 +76,11 @@ enable=1 interval_s=1 telegraf_bind_address=127.0.0.1 telegraf_listen_port=8300 +output_fs_interval_ms=500 +output_kafka_interval_ms=1000 +data_center=center-xxg-tsgx +device_group=group-xxg-tsgx +device_id=9800165603247024 [bfdd] enable=1 diff --git a/platform/CMakeLists.txt b/platform/CMakeLists.txt index c66ffd1..46055ca 100644 --- a/platform/CMakeLists.txt +++ b/platform/CMakeLists.txt @@ -3,6 +3,7 @@ target_link_libraries(platform PUBLIC common) target_link_libraries(platform PUBLIC pthread) target_link_libraries(platform PUBLIC MESA_prof_load) target_link_libraries(platform PUBLIC MESA_field_stat) +target_link_libraries(platform PUBLIC fieldstat4) target_link_libraries(platform PUBLIC breakpad_mini) target_link_libraries(platform PUBLIC maatframe) target_link_libraries(platform PUBLIC mrzcpd) diff --git a/platform/include/sce.h b/platform/include/sce.h index 63ddd7d..15c3e80 100644 --- a/platform/include/sce.h +++ b/platform/include/sce.h @@ -13,6 +13,7 @@ extern "C" #include "timestamp.h" #include "packet_io.h" #include "session_table.h" +#include "sf_metrics.h" #include "global_metrics.h" #define MAX_THREAD_NUM 256 @@ -26,7 +27,6 @@ struct thread_ctx pthread_t tid; int thread_index; - struct sf_metrics *sf_metrics; struct session_table *session_table; struct packet_io *ref_io; @@ -103,6 +103,7 @@ struct sce_ctx struct kafka *kfk; struct timestamp *ts; struct packet_io *io; + struct sf_metrics *sf_metrics; struct global_metrics *metrics; struct policy_enforcer *enforcer; struct thread_ctx work_threads[MAX_THREAD_NUM]; diff --git a/platform/include/sf_metrics.h b/platform/include/sf_metrics.h index 04feb01..4481d7d 100644 --- a/platform/include/sf_metrics.h +++ b/platform/include/sf_metrics.h @@ -7,6 +7,7 @@ extern "C" #endif #include +#include "kafka.h" struct sf_metrics_key { @@ -16,12 +17,12 @@ struct sf_metrics_key uint32_t vsys_id; }; -struct sf_metrics *sf_metrics_create(const char *profile); +struct sf_metrics *sf_metrics_create(const char *profile, struct kafka *kfk); void sf_metrics_destory(struct sf_metrics *handle); -void sf_metrics_reset(struct sf_metrics *handle); -void sf_metrics_inc(struct sf_metrics *handle, struct sf_metrics_key *key, uint64_t rx_pkts, uint64_t rx_bytes, uint64_t tx_pkts, uint64_t tx_bytes); -void sf_metrics_send(struct sf_metrics *handle); +void sf_metrics_reset(struct sf_metrics *handle, uint16_t thr_idx); +void sf_metrics_input(struct sf_metrics *handle, uint16_t thr_idx, struct sf_metrics_key *key, uint64_t rx_pkts, uint64_t rx_bytes, uint64_t tx_pkts, uint64_t tx_bytes); +void sf_metrics_output(struct sf_metrics *handle, uint16_t thr_idx); int sf_metrics_get_interval(struct sf_metrics *handle); #ifdef __cplusplus diff --git a/platform/src/main.cpp b/platform/src/main.cpp index f963ebf..6cf813e 100644 --- a/platform/src/main.cpp +++ b/platform/src/main.cpp @@ -65,7 +65,7 @@ static void *worker_thread_cycle(void *arg) struct packet_io *handle = thread_ctx->ref_io; struct sce_ctx *sce_ctx = thread_ctx->ref_sce_ctx; struct timestamp *ts = sce_ctx->ts; - struct sf_metrics *sf_metrics = thread_ctx->sf_metrics; + struct sf_metrics *sf_metrics = sce_ctx->sf_metrics; struct session_table *session_table = thread_ctx->session_table; struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics; struct global_metrics *global_metrics = thread_ctx->ref_global_metrics; @@ -116,8 +116,8 @@ static void *worker_thread_cycle(void *arg) if (timestamp_get_msec(ts) - sf_metrics_last_send_ts >= sf_metrics_send_interval) { - sf_metrics_send(sf_metrics); - sf_metrics_reset(sf_metrics); + sf_metrics_output(sf_metrics, thread_index); + sf_metrics_reset(sf_metrics, thread_index); sf_metrics_last_send_ts = timestamp_get_msec(ts); } } @@ -197,7 +197,6 @@ int main(int argc, char **argv) ctx->work_threads[i].tid = 0; ctx->work_threads[i].thread_index = i; ctx->work_threads[i].session_table = session_table_create(); - ctx->work_threads[i].sf_metrics = sf_metrics_create(profile); ctx->work_threads[i].ref_io = ctx->io; ctx->work_threads[i].ref_global_metrics = ctx->metrics; ctx->work_threads[i].ref_enforcer = ctx->enforcer; @@ -242,7 +241,6 @@ error_out: { struct thread_ctx *thread_ctx = &ctx->work_threads[i]; session_table_destory(thread_ctx->session_table); - sf_metrics_destory(thread_ctx->sf_metrics); break; } else diff --git a/platform/src/packet_io.cpp b/platform/src/packet_io.cpp index c75f2ec..f8a73e7 100644 --- a/platform/src/packet_io.cpp +++ b/platform/src/packet_io.cpp @@ -649,6 +649,7 @@ static inline void action_mirr_forward(struct session_ctx *session_ctx, marsio_b { struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics; struct packet_io *packet_io = thread_ctx->ref_io; + struct sf_metrics *sf_metrics = thread_ctx->ref_sce_ctx->sf_metrics; int thread_index = thread_ctx->thread_index; char *raw_data = marsio_buff_mtod(rx_buff); @@ -671,7 +672,7 @@ static inline void action_mirr_forward(struct session_ctx *session_ctx, marsio_b key.sff_profile_id = sf->sff_profile_id; key.sf_profile_id = sf->sf_profile_id; key.vsys_id = sf->rule_vsys_id; - sf_metrics_inc(thread_ctx->sf_metrics, &key, 0, 0, 1, nsend); + sf_metrics_input(sf_metrics, thread_index, &key, 0, 0, 1, nsend); } static inline void action_stee_bypass(marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx) @@ -695,6 +696,8 @@ static inline void action_stee_block(marsio_buff_t *rx_buff, struct metadata *me static inline void action_stee_forward(struct session_ctx *session_ctx, marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, struct thread_ctx *thread_ctx) { struct thread_metrics *thread_metrics = &thread_ctx->thread_metrics; + struct sf_metrics *sf_metrics = thread_ctx->ref_sce_ctx->sf_metrics; + int thread_index = thread_ctx->thread_index; int nsend = send_packet_to_sf(session_ctx, rx_buff, meta, sf, thread_ctx); THROUGHPUT_METRICS_INC(&(thread_metrics->stee_tx), 1, meta->raw_len); @@ -704,7 +707,7 @@ static inline void action_stee_forward(struct session_ctx *session_ctx, marsio_b key.sff_profile_id = sf->sff_profile_id; key.sf_profile_id = sf->sf_profile_id; key.vsys_id = sf->rule_vsys_id; - sf_metrics_inc(thread_ctx->sf_metrics, &key, 0, 0, 1, nsend); + sf_metrics_input(sf_metrics, thread_index, &key, 0, 0, 1, nsend); } static void action_sf_chaining(struct thread_ctx *thread_ctx, struct session_ctx *session_ctx, struct selected_chaining *chaining, marsio_buff_t *rx_buff, struct metadata *meta, int next_sf_index) @@ -1234,6 +1237,7 @@ static void handle_inject_vxlan_packet(marsio_buff_t *rx_buff, struct thread_ctx struct packet_io *packet_io = thread_ctx->ref_io; int thread_index = thread_ctx->thread_index; struct sce_ctx *sce_ctx = thread_ctx->ref_sce_ctx; + struct sf_metrics *sf_metrics = sce_ctx->sf_metrics; struct metadata meta; struct vxlan_hdr *vxlan_hdr = NULL; @@ -1311,7 +1315,7 @@ static void handle_inject_vxlan_packet(marsio_buff_t *rx_buff, struct thread_ctx key.sff_profile_id = sf->sff_profile_id; key.sf_profile_id = sf->sf_profile_id; key.vsys_id = sf->rule_vsys_id; - sf_metrics_inc(thread_ctx->sf_metrics, &key, 1, raw_len, 0, 0); + sf_metrics_input(sf_metrics, thread_index, &key, 1, raw_len, 0, 0); } marsio_buff_adj(rx_buff, raw_len - meta.raw_len); diff --git a/platform/src/sce.cpp b/platform/src/sce.cpp index 4f7401a..fe4ba89 100644 --- a/platform/src/sce.cpp +++ b/platform/src/sce.cpp @@ -3,8 +3,6 @@ #include "sce.h" #include "log.h" -#include "kafka.h" -#include "global_metrics.h" char *memdup(const char *src, int len) { @@ -95,6 +93,12 @@ struct sce_ctx *sce_ctx_create(const char *profile) } sce_ctx->ts = timestamp_new(sce_ctx->ts_update_interval_ms); + sce_ctx->sf_metrics = sf_metrics_create(profile, sce_ctx->kfk); + if (sce_ctx->sf_metrics == NULL) + { + goto error_out; + } + sce_ctx->metrics = global_metrics_create(profile, sce_ctx->nr_worker_threads); if (sce_ctx->metrics == NULL) { @@ -132,10 +136,11 @@ void sce_ctx_destory(struct sce_ctx *sce_ctx) packet_io_destory(sce_ctx->io); policy_enforcer_destory(sce_ctx->enforcer); global_metrics_destory(sce_ctx->metrics); + sf_metrics_destory(sce_ctx->sf_metrics); timestamp_free(sce_ctx->ts); kafka_destroy(sce_ctx->kfk); free(sce_ctx); sce_ctx = NULL; } -} +} \ No newline at end of file diff --git a/platform/src/sf_metrics.cpp b/platform/src/sf_metrics.cpp index 3e81836..90d3200 100644 --- a/platform/src/sf_metrics.cpp +++ b/platform/src/sf_metrics.cpp @@ -2,23 +2,38 @@ #include #include #include -#include -#include #include +#include +#include "sce.h" #include "log.h" #include "utils.h" +#include "uthash.h" #include "sf_metrics.h" -#define SCE_SF_METRICS "service_chaining_rule_hits,vsys_id=%d,rule_id=%lu,sff_profile_id=%d,sf_profile_id=%d sent_pkts=%lu,sent_bytes=%lu,recv_pkts=%lu,recv_bytes=%lu" +/* + * due to per pakcet call fieldstat_easy_counter_incrby() is not performance friendly, + * we use a cache table to store the metrics data, and then per interval call fieldstat_easy_counter_incrby() + * + * +-----------------+ +---------------+ +---------------------------------+ + * | | per packet call | | per interval call | | + * | worker thread 1 | -> sf_metrics_input() -> | cache table 1 | -> sf_metrics_output() -> | fieldstat_easy_counter_incrby() |+-+ + * | | with thr idx 1 | | with thr idx 1 | with thr idx 1 | \ + * +-----------------+ +---------------+ +---------------------------------+ \ + * \ per interval + * +--------------> fieldstat to json + * / send json to kafka + * +-----------------+ +---------------+ +---------------------------------+ / + * | | per packet call | | per interval call | | / + * | worker thread N | -> sf_metrics_input() -> | cache table N | -> sf_metrics_output() -> | fieldstat_easy_counter_incrby() |+-+ + * | | with thr idx N | | with thr idx N | with thr idx N | + * +-----------------+ +---------------+ +---------------------------------+ + */ -// Must be defined before including uthash.h -#define HASH_KEYCMP(a, b, len) sf_metrics_key_cmp((struct sf_metrics_key *)(a), (struct sf_metrics_key *)(b)) -#include "uthash.h" - -struct node +struct metric { struct sf_metrics_key key; + uint64_t sent_pkts; uint64_t sent_bytes; uint64_t recv_pkts; @@ -27,193 +42,241 @@ struct node UT_hash_handle hh; }; -struct sf_metrics +struct config { - int enable; - int interval_s; - int telegraf_listen_port; - char telegraf_bind_address[2048]; - - struct sockaddr_in sock_addr; - int sockfd; - - struct node *htable; - uint64_t htable_elem_count; + uint16_t thr_num; + int output_fs_interval_ms; + int output_kafka_interval_ms; + char data_center[256]; + char device_group[256]; + char device_id[256]; }; -static inline int sf_metrics_key_cmp(struct sf_metrics_key *a, struct sf_metrics_key *b) +struct sf_metrics { - if (a->sf_profile_id != b->sf_profile_id) - { - return 1; - } + struct config cfg; - if (a->sff_profile_id != b->sff_profile_id) - { - return 1; - } + int sent_pkts_idx; + int sent_bytes_idx; + int recv_pkts_idx; + int recv_bytes_idx; - if (a->rule_id != b->rule_id) - { - return 1; - } + pthread_t tid; + int thr_is_runing; + int thr_need_exit; + struct kafka *kfk; + struct fieldstat_easy *fs; + struct metric *root[MAX_THREAD_NUM]; +}; - if (a->vsys_id != b->vsys_id) - { - return 1; - } +/****************************************************************************** + * Private API + ******************************************************************************/ - return 0; +static void *fs2kafka_thread_cycle(void *arg) +{ + struct sf_metrics *handle = (struct sf_metrics *)arg; + ATOMIC_SET(&handle->thr_is_runing, 1); + + char **ptr = NULL; + size_t len = 0; + while (!ATOMIC_READ(&handle->thr_need_exit)) + { + fieldstat_easy_output_array_and_reset(handle->fs, &ptr, &len); + if (ptr) + { + for (size_t i = 0; i < len; i++) + { + kafka_send(handle->kfk, TOPIC_RULE_HITS, ptr[i], strlen(ptr[i])); + free(ptr[i]); + ptr[i] = NULL; + } + free(ptr); + } + + usleep(handle->cfg.output_fs_interval_ms * 1000); + } + ATOMIC_SET(&handle->thr_is_runing, 0); + + return NULL; } -struct sf_metrics *sf_metrics_create(const char *profile) +/****************************************************************************** + * Public API + ******************************************************************************/ + +struct sf_metrics *sf_metrics_create(const char *profile, struct kafka *kfk) { struct sf_metrics *handle = (struct sf_metrics *)calloc(1, sizeof(struct sf_metrics)); - assert(handle); - - MESA_load_profile_int_def(profile, "METRICS", "enable", &(handle->enable), 1); - MESA_load_profile_int_def(profile, "METRICS", "interval_s", &(handle->interval_s), 1); - MESA_load_profile_int_def(profile, "METRICS", "telegraf_listen_port", &(handle->telegraf_listen_port), 8300); - MESA_load_profile_string_def(profile, "METRICS", "telegraf_bind_address", handle->telegraf_bind_address, sizeof(handle->telegraf_bind_address), "127.0.0.1"); - - if (handle->enable == 0) + if (!handle) { - return handle; - } - - handle->sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - handle->sock_addr.sin_family = AF_INET; - handle->sock_addr.sin_port = htons(handle->telegraf_listen_port); - handle->sock_addr.sin_addr.s_addr = inet_addr(handle->telegraf_bind_address); - handle->htable_elem_count = 0; - if (handle->sockfd == -1) - { - LOG_ERROR("%s: failed to create udp sockfd %s:%d, errno: %d, %s", LOG_TAG_SFMETRICS, handle->telegraf_bind_address, handle->telegraf_listen_port, errno, strerror(errno)); - sf_metrics_destory(handle); return NULL; } + MESA_load_profile_int_def(profile, "system", "nr_worker_threads", (int *)&(handle->cfg.thr_num), 0); + MESA_load_profile_int_def(profile, "metrics", "output_fs_interval_ms", &(handle->cfg.output_fs_interval_ms), 500); + MESA_load_profile_int_def(profile, "metrics", "output_kafka_interval_ms", &(handle->cfg.output_kafka_interval_ms), 1000); + MESA_load_profile_string_def(profile, "metrics", "data_center", handle->cfg.data_center, sizeof(handle->cfg.data_center), ""); + MESA_load_profile_string_def(profile, "metrics", "device_group", handle->cfg.device_group, sizeof(handle->cfg.device_group), ""); + MESA_load_profile_string_def(profile, "metrics", "device_id", handle->cfg.device_id, sizeof(handle->cfg.device_id), ""); + + const struct fieldstat_tag tags[] = { + {"data_center", TAG_CSTRING, {.value_str = handle->cfg.data_center}}, + {"device_group", TAG_CSTRING, {.value_str = handle->cfg.device_group}}, + {"device_id", TAG_CSTRING, {.value_str = handle->cfg.device_id}}, + }; + + handle->kfk = kfk; + handle->fs = fieldstat_easy_new(handle->cfg.thr_num, "service_chaining_rule_hits", tags, sizeof(tags) / sizeof(tags[0])); + if (!handle->fs) + { + goto error_out; + } + + handle->sent_pkts_idx = fieldstat_easy_register_counter(handle->fs, "sent_pkts"); + handle->sent_bytes_idx = fieldstat_easy_register_counter(handle->fs, "sent_bytes"); + handle->recv_pkts_idx = fieldstat_easy_register_counter(handle->fs, "recv_pkts"); + handle->recv_bytes_idx = fieldstat_easy_register_counter(handle->fs, "recv_bytes"); + + if (pthread_create(&handle->tid, NULL, fs2kafka_thread_cycle, (void *)handle) < 0) + { + goto error_out; + } + return handle; + +error_out: + sf_metrics_destory(handle); + return NULL; } void sf_metrics_destory(struct sf_metrics *handle) { if (handle) { - if (handle->sockfd) + ATOMIC_SET(&handle->thr_need_exit, 1); + while (ATOMIC_READ(&handle->thr_is_runing)) { - close(handle->sockfd); - handle->sockfd = -1; + usleep(1000); } - struct node *temp = NULL; - struct node *node = NULL; - HASH_ITER(hh, handle->htable, node, temp) + for (int i = 0; i < handle->cfg.thr_num; i++) { - HASH_DELETE(hh, handle->htable, node); - - free(node); - node = NULL; + sf_metrics_reset(handle, i); + } + + if (handle->fs) + { + fieldstat_easy_free(handle->fs); + handle->fs = NULL; } - handle->htable_elem_count = 0; free(handle); handle = NULL; } } -void sf_metrics_reset(struct sf_metrics *handle) +void sf_metrics_reset(struct sf_metrics *handle, uint16_t thr_idx) { if (handle == NULL) { return; } - if (handle->enable == 0) + if (thr_idx >= handle->cfg.thr_num) { + assert(0); return; } - struct node *temp = NULL; - struct node *node = NULL; - HASH_ITER(hh, handle->htable, node, temp) + struct metric *temp = NULL; + struct metric *node = NULL; + HASH_ITER(hh, handle->root[thr_idx], node, temp) { - HASH_DELETE(hh, handle->htable, node); + HASH_DELETE(hh, handle->root[thr_idx], node); free(node); node = NULL; - handle->htable_elem_count--; } } -void sf_metrics_inc(struct sf_metrics *handle, struct sf_metrics_key *key, uint64_t rx_pkts, uint64_t rx_bytes, uint64_t tx_pkts, uint64_t tx_bytes) +void sf_metrics_input(struct sf_metrics *handle, uint16_t thr_idx, struct sf_metrics_key *key, uint64_t rx_pkts, uint64_t rx_bytes, uint64_t tx_pkts, uint64_t tx_bytes) { - if (handle->enable == 0) + if (handle == NULL) { return; } - struct node *temp = NULL; - HASH_FIND(hh, handle->htable, key, sizeof(struct sf_metrics_key), temp); - if (temp) + if (thr_idx >= handle->cfg.thr_num) { - temp->recv_pkts += rx_pkts; - temp->recv_bytes += rx_bytes; - temp->sent_pkts += tx_pkts; - temp->sent_bytes += tx_bytes; + assert(0); + return; + } + + struct metric *node = NULL; + HASH_FIND(hh, handle->root[thr_idx], key, sizeof(struct sf_metrics_key), node); + if (node) + { + node->recv_pkts += rx_pkts; + node->recv_bytes += rx_bytes; + node->sent_pkts += tx_pkts; + node->sent_bytes += tx_bytes; } else { - temp = (struct node *)calloc(1, sizeof(struct node)); - temp->key.vsys_id = key->vsys_id; - temp->key.rule_id = key->rule_id; - temp->key.sff_profile_id = key->sff_profile_id; - temp->key.sf_profile_id = key->sf_profile_id; - temp->recv_pkts = rx_pkts; - temp->recv_bytes = rx_bytes; - temp->sent_pkts = tx_pkts; - temp->sent_bytes = tx_bytes; + node = (struct metric *)calloc(1, sizeof(struct metric)); + node->key.vsys_id = key->vsys_id; + node->key.rule_id = key->rule_id; + node->key.sff_profile_id = key->sff_profile_id; + node->key.sf_profile_id = key->sf_profile_id; - HASH_ADD(hh, handle->htable, key, sizeof(struct sf_metrics_key), temp); + node->recv_pkts = rx_pkts; + node->recv_bytes = rx_bytes; + node->sent_pkts = tx_pkts; + node->sent_bytes = tx_bytes; + + HASH_ADD(hh, handle->root[thr_idx], key, sizeof(struct sf_metrics_key), node); } } -void sf_metrics_send(struct sf_metrics *handle) +void sf_metrics_output(struct sf_metrics *handle, uint16_t thr_idx) { - char buff[2048]; - int nsend = 0; - int size = sizeof(buff); - - struct node *temp = NULL; - struct node *node = NULL; - - if (handle->enable == 0) + if (handle == NULL) { return; } - HASH_ITER(hh, handle->htable, node, temp) + if (thr_idx >= handle->cfg.thr_num) { - if (node->sent_pkts == 0 && node->recv_pkts == 0) + assert(0); + return; + } + + struct metric *temp = NULL; + struct metric *node = NULL; + HASH_ITER(hh, handle->root[thr_idx], node, temp) + { + if (node->sent_pkts == 0 && node->recv_pkts == 0 && + node->sent_bytes == 0 && node->recv_bytes == 0) { continue; } - memset(buff, 0, size); - nsend = snprintf(buff, size, SCE_SF_METRICS, - node->key.vsys_id, - node->key.rule_id, - node->key.sff_profile_id, - node->key.sf_profile_id, - node->sent_pkts, - node->sent_bytes, - node->recv_pkts, - node->recv_bytes); - sendto(handle->sockfd, buff, nsend, 0, (struct sockaddr *)&handle->sock_addr, sizeof(handle->sock_addr)); + const struct fieldstat_tag tags[] = { + {"vsys_id", TAG_INTEGER, {.value_longlong = node->key.vsys_id}}, + {"rule_id", TAG_INTEGER, {.value_longlong = (long long)node->key.rule_id}}, + {"sff_profile_id", TAG_INTEGER, {.value_longlong = node->key.sff_profile_id}}, + {"sf_profile_id", TAG_INTEGER, {.value_longlong = node->key.sf_profile_id}}, + }; + + fieldstat_easy_counter_incrby(handle->fs, thr_idx, handle->sent_pkts_idx, tags, sizeof(tags) / sizeof(tags[0]), node->sent_pkts); + fieldstat_easy_counter_incrby(handle->fs, thr_idx, handle->sent_bytes_idx, tags, sizeof(tags) / sizeof(tags[0]), node->sent_bytes); + fieldstat_easy_counter_incrby(handle->fs, thr_idx, handle->recv_pkts_idx, tags, sizeof(tags) / sizeof(tags[0]), node->recv_pkts); + fieldstat_easy_counter_incrby(handle->fs, thr_idx, handle->recv_bytes_idx, tags, sizeof(tags) / sizeof(tags[0]), node->recv_bytes); } } int sf_metrics_get_interval(struct sf_metrics *handle) { - return handle->interval_s; + return handle->cfg.output_fs_interval_ms; } \ No newline at end of file diff --git a/platform/test/gtest_sf_metrics.cpp b/platform/test/gtest_sf_metrics.cpp index c9dc998..d8a4352 100644 --- a/platform/test/gtest_sf_metrics.cpp +++ b/platform/test/gtest_sf_metrics.cpp @@ -1,29 +1,98 @@ #include +#include "kafka.h" #include "sf_metrics.h" -TEST(SF_METRICS, TEST) +#if 1 +TEST(SF_METRICS, TEST1) { - struct sf_metrics *metrics = sf_metrics_create("./test_resource/sce.conf"); - EXPECT_TRUE(sf_metrics_get_interval(metrics) == 1); + uint16_t thr_idx0 = 0; + uint16_t thr_idx1 = 1; + + struct kafka *kfk = kafka_create("./test_resource/sce.conf"); + EXPECT_TRUE(kfk != NULL); + struct sf_metrics *metrics = sf_metrics_create("./test_resource/sce.conf", kfk); + EXPECT_TRUE(metrics != NULL); struct sf_metrics_key key1 = {0}; - key1.rule_id = 1; - key1.sff_profile_id = 2; - key1.sf_profile_id = 3; - key1.vsys_id = 4; - sf_metrics_inc(metrics, &key1, 4, 5, 6, 7); - + key1.vsys_id = 1; + key1.rule_id = 2; + key1.sff_profile_id = 3; + key1.sf_profile_id = 4; struct sf_metrics_key key2 = {0}; - key2.rule_id = 1; - key2.sff_profile_id = 2; - key2.sf_profile_id = 3; key2.vsys_id = 4; - sf_metrics_inc(metrics, &key2, 4, 5, 6, 7); + key2.rule_id = 3; + key2.sff_profile_id = 2; + key2.sf_profile_id = 1; + + // thread 0 + // uint64_t rx_pkts, uint64_t rx_bytes, uint64_t tx_pkts, uint64_t tx_bytes); + sf_metrics_input(metrics, thr_idx0, &key1, 1, 2, 2, 4); + sf_metrics_input(metrics, thr_idx0, &key2, 2, 4, 1, 2); + sf_metrics_output(metrics, thr_idx0); + sf_metrics_reset(metrics, thr_idx0); + + printf("\n========================================\nworker thread 0 done\n========================================\n"); + sleep(3); + + // thread 1 + sf_metrics_input(metrics, thr_idx1, &key1, 2, 4, 1, 2); + sf_metrics_input(metrics, thr_idx1, &key2, 1, 2, 2, 4); + sf_metrics_output(metrics, thr_idx1); + sf_metrics_reset(metrics, thr_idx1); + + printf("\n========================================\nworker thread 1 done\n========================================\n"); + sleep(3); - sf_metrics_send(metrics); sf_metrics_destory(metrics); + kafka_destroy(kfk); } +#endif + +#if 1 +TEST(SF_METRICS, TEST2) +{ + uint16_t thr_idx0 = 0; + uint16_t thr_idx1 = 1; + + struct kafka *kfk = kafka_create("./test_resource/sce.conf"); + EXPECT_TRUE(kfk != NULL); + struct sf_metrics *metrics = sf_metrics_create("./test_resource/sce.conf", kfk); + EXPECT_TRUE(metrics != NULL); + + struct sf_metrics_key key1 = {0}; + key1.vsys_id = 1; + key1.rule_id = 2; + key1.sff_profile_id = 3; + key1.sf_profile_id = 4; + struct sf_metrics_key key2 = {0}; + key2.vsys_id = 4; + key2.rule_id = 3; + key2.sff_profile_id = 2; + key2.sf_profile_id = 1; + + // thread 0 + // uint64_t rx_pkts, uint64_t rx_bytes, uint64_t tx_pkts, uint64_t tx_bytes); + sf_metrics_input(metrics, thr_idx0, &key1, 1, 2, 2, 4); + sf_metrics_input(metrics, thr_idx0, &key2, 2, 4, 1, 2); + sf_metrics_output(metrics, thr_idx0); + sf_metrics_reset(metrics, thr_idx0); + + printf("\n========================================\nworker thread 0 done\n========================================\n"); + + // thread 1 + sf_metrics_input(metrics, thr_idx1, &key1, 2, 4, 1, 2); + sf_metrics_input(metrics, thr_idx1, &key2, 1, 2, 2, 4); + sf_metrics_output(metrics, thr_idx1); + sf_metrics_reset(metrics, thr_idx1); + + printf("\n========================================\nworker thread 1 done\n========================================\n"); + sleep(3); + + sf_metrics_destory(metrics); + kafka_destroy(kfk); +} +#endif int main(int argc, char **argv) { diff --git a/platform/test/test_resource/sce.conf b/platform/test/test_resource/sce.conf index 73c1734..04360ee 100644 --- a/platform/test/test_resource/sce.conf +++ b/platform/test/test_resource/sce.conf @@ -23,14 +23,25 @@ redis_server=127.0.0.1 redis_port_range=6379 [metrics] -# Kafka Topic: POLICY-RULE-METRICS +# Kafka Topic: POLICY-RULE-METRIC enable=1 interval_s=1 telegraf_bind_address=127.0.0.1 telegraf_listen_port=8300 +output_fs_interval_ms=500 +output_kafka_interval_ms=1000 +data_center=center-xxg-tsgx +device_group=group-xxg-tsgx +device_id=9800165603247024 [bfdd] path=/var/run/frr/bfdd.vty device=eth0 local_address=127.0.0.1 -gateway=127.0.0.1 \ No newline at end of file +gateway=127.0.0.1 + +[kafka] +brokerlist=192.168.40.224:9092 +sasl_username=admin +sasl_passwd=galaxy2019 +topic_name=POLICY-RULE-METRIC \ No newline at end of file diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ce0c1f4..7d7d65c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -19,7 +19,7 @@ add_library(temp_platform ${CMAKE_SOURCE_DIR}/platform/src/sf_status.cpp) target_include_directories(temp_platform PUBLIC ${CMAKE_SOURCE_DIR}/common/include) target_include_directories(temp_platform PUBLIC ${CMAKE_SOURCE_DIR}/platform/include) -target_link_libraries(temp_platform PUBLIC common pthread cjson maatframe MESA_prof_load MESA_field_stat gmock_marsio) +target_link_libraries(temp_platform PUBLIC common pthread cjson maatframe MESA_prof_load MESA_field_stat fieldstat4 gmock_marsio) ############################################################################### # gtest_ctrl_pkt_opening diff --git a/test/gtest_utils.h b/test/gtest_utils.h index 27b3bc4..44ce420 100644 --- a/test/gtest_utils.h +++ b/test/gtest_utils.h @@ -95,7 +95,6 @@ inline struct gtest_frame *gtest_frame_new(const char *json_file, const char *de thread_ctx->tid = 0; thread_ctx->thread_index = 0; thread_ctx->session_table = session_table_create(); - thread_ctx->sf_metrics = sf_metrics_create(profile); thread_ctx->ref_io = sce_ctx->io; thread_ctx->ref_global_metrics = sce_ctx->metrics; thread_ctx->ref_enforcer = sce_ctx->enforcer; @@ -119,7 +118,6 @@ inline void gtest_frame_free(struct gtest_frame *instance) struct thread_ctx *thread_ctx = &sce_ctx->work_threads[0]; session_table_destory(thread_ctx->session_table); - sf_metrics_destory(thread_ctx->sf_metrics); sce_ctx_destory(sce_ctx); LOG_CLOSE(); diff --git a/test/test_data/conf/sce.conf b/test/test_data/conf/sce.conf index 5d82e30..2ad588b 100644 --- a/test/test_data/conf/sce.conf +++ b/test/test_data/conf/sce.conf @@ -75,6 +75,11 @@ enable=0 interval_s=1 telegraf_bind_address=127.0.0.1 telegraf_listen_port=8300 +output_fs_interval_ms=500 +output_kafka_interval_ms=1000 +data_center=center-xxg-tsgx +device_group=group-xxg-tsgx +device_id=9800165603247024 [bfdd] enable=0 diff --git a/vendor/CMakeLists.txt b/vendor/CMakeLists.txt index 816c8ba..77edd1e 100644 --- a/vendor/CMakeLists.txt +++ b/vendor/CMakeLists.txt @@ -56,6 +56,10 @@ add_library(MESA_field_stat SHARED IMPORTED GLOBAL) set_property(TARGET MESA_field_stat PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libMESA_field_stat2.so) set_property(TARGET MESA_field_stat PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR}) +add_library(fieldstat4 SHARED IMPORTED GLOBAL) +set_property(TARGET fieldstat4 PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libfieldstat4.so) +set_property(TARGET fieldstat4 PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR}/fieldstat/) + add_library(maatframe SHARED IMPORTED GLOBAL) set_property(TARGET maatframe PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libmaatframe.so) set_property(TARGET maatframe PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR})