#include #include #include #include #include #include #include "sce.h" #include "log.h" #include "utils.h" #include "uthash.h" #include "sf_metrics.h" /* * due to per pakcet call fieldstat_easy_counter_incrby() is not performance friendly, * we use a cache table to store the metrics data, and then per interval call fieldstat_easy_counter_incrby() * * +-----------------+ +---------------+ +---------------------------------+ * | | per packet call | | per interval call | | * | worker thread 1 | -> sf_metrics_input() -> | cache table 1 | -> sf_metrics_output() -> | fieldstat_easy_counter_incrby() |+-+ * | | with thr idx 1 | | with thr idx 1 | with thr idx 1 | \ * +-----------------+ +---------------+ +---------------------------------+ \ * \ per interval * +--------------> fieldstat to json * / send json to kafka * +-----------------+ +---------------+ +---------------------------------+ / * | | per packet call | | per interval call | | / * | worker thread N | -> sf_metrics_input() -> | cache table N | -> sf_metrics_output() -> | fieldstat_easy_counter_incrby() |+-+ * | | with thr idx N | | with thr idx N | with thr idx N | * +-----------------+ +---------------+ +---------------------------------+ */ struct metric { struct sf_metrics_key key; uint64_t sent_pkts; uint64_t sent_bytes; uint64_t recv_pkts; uint64_t recv_bytes; UT_hash_handle hh; }; struct config { uint16_t thr_num; int output_fs_interval_ms; int output_kafka_interval_ms; char data_center[256]; char device_group[256]; char device_id[256]; }; struct sf_metrics { struct config cfg; int sent_pkts_idx; int sent_bytes_idx; int recv_pkts_idx; int recv_bytes_idx; pthread_t tid; int thr_is_runing; int thr_need_exit; struct kafka *kfk; struct fieldstat_easy *fs; struct metric *root[MAX_THREAD_NUM]; }; /****************************************************************************** * Private API ******************************************************************************/ static void *fs2kafka_thread_cycle(void *arg) { struct sf_metrics *handle = (struct sf_metrics *)arg; ATOMIC_SET(&handle->thr_is_runing, 1); char **ptr = NULL; size_t len = 0; while (!ATOMIC_READ(&handle->thr_need_exit)) { fieldstat_easy_output_array_and_reset(handle->fs, &ptr, &len); if (ptr) { for (size_t i = 0; i < len; i++) { kafka_send(handle->kfk, TOPIC_RULE_HITS, ptr[i], strlen(ptr[i])); free(ptr[i]); ptr[i] = NULL; } free(ptr); } usleep(handle->cfg.output_kafka_interval_ms * 1000); } ATOMIC_SET(&handle->thr_is_runing, 0); return NULL; } /****************************************************************************** * Public API ******************************************************************************/ struct sf_metrics *sf_metrics_create(const char *profile, struct kafka *kfk) { struct sf_metrics *handle = (struct sf_metrics *)calloc(1, sizeof(struct sf_metrics)); if (!handle) { return NULL; } MESA_load_profile_int_def(profile, "system", "nr_worker_threads", (int *)&(handle->cfg.thr_num), 0); MESA_load_profile_int_def(profile, "metrics", "output_fs_interval_ms", &(handle->cfg.output_fs_interval_ms), 500); MESA_load_profile_int_def(profile, "metrics", "output_kafka_interval_ms", &(handle->cfg.output_kafka_interval_ms), 1000); MESA_load_profile_string_def(profile, "metrics", "data_center", handle->cfg.data_center, sizeof(handle->cfg.data_center), ""); MESA_load_profile_string_def(profile, "metrics", "device_group", handle->cfg.device_group, sizeof(handle->cfg.device_group), ""); MESA_load_profile_string_def(profile, "metrics", "device_id", handle->cfg.device_id, sizeof(handle->cfg.device_id), ""); const struct field tags[] = { {"data_center", FIELD_VALUE_CSTRING, {.value_str = handle->cfg.data_center}}, {"device_group", FIELD_VALUE_CSTRING, {.value_str = handle->cfg.device_group}}, {"device_id", FIELD_VALUE_CSTRING, {.value_str = handle->cfg.device_id}}, }; handle->kfk = kfk; handle->fs = fieldstat_easy_new(handle->cfg.thr_num, "service_chaining_rule_hits", tags, sizeof(tags) / sizeof(tags[0])); if (!handle->fs) { goto error_out; } handle->sent_pkts_idx = fieldstat_easy_register_counter(handle->fs, "sent_pkts"); handle->sent_bytes_idx = fieldstat_easy_register_counter(handle->fs, "sent_bytes"); handle->recv_pkts_idx = fieldstat_easy_register_counter(handle->fs, "recv_pkts"); handle->recv_bytes_idx = fieldstat_easy_register_counter(handle->fs, "recv_bytes"); if (pthread_create(&handle->tid, NULL, fs2kafka_thread_cycle, (void *)handle) < 0) { goto error_out; } return handle; error_out: sf_metrics_destory(handle); return NULL; } void sf_metrics_destory(struct sf_metrics *handle) { if (handle) { ATOMIC_SET(&handle->thr_need_exit, 1); while (ATOMIC_READ(&handle->thr_is_runing)) { usleep(1000); } for (int i = 0; i < handle->cfg.thr_num; i++) { sf_metrics_reset(handle, i); } if (handle->fs) { fieldstat_easy_free(handle->fs); handle->fs = NULL; } free(handle); handle = NULL; } } void sf_metrics_reset(struct sf_metrics *handle, uint16_t thr_idx) { if (handle == NULL) { return; } if (thr_idx >= handle->cfg.thr_num) { assert(0); return; } struct metric *temp = NULL; struct metric *node = NULL; HASH_ITER(hh, handle->root[thr_idx], node, temp) { HASH_DELETE(hh, handle->root[thr_idx], node); free(node); node = NULL; } } void sf_metrics_input(struct sf_metrics *handle, uint16_t thr_idx, struct sf_metrics_key *key, uint64_t rx_pkts, uint64_t rx_bytes, uint64_t tx_pkts, uint64_t tx_bytes) { if (handle == NULL) { return; } if (thr_idx >= handle->cfg.thr_num) { assert(0); return; } struct metric *node = NULL; HASH_FIND(hh, handle->root[thr_idx], key, sizeof(struct sf_metrics_key), node); if (node) { node->recv_pkts += rx_pkts; node->recv_bytes += rx_bytes; node->sent_pkts += tx_pkts; node->sent_bytes += tx_bytes; } else { node = (struct metric *)calloc(1, sizeof(struct metric)); node->key.vsys_id = key->vsys_id; node->key.rule_id = key->rule_id; node->key.sff_profile_id = key->sff_profile_id; node->key.sf_profile_id = key->sf_profile_id; node->recv_pkts = rx_pkts; node->recv_bytes = rx_bytes; node->sent_pkts = tx_pkts; node->sent_bytes = tx_bytes; HASH_ADD(hh, handle->root[thr_idx], key, sizeof(struct sf_metrics_key), node); } } void sf_metrics_output(struct sf_metrics *handle, uint16_t thr_idx) { if (handle == NULL) { return; } if (thr_idx >= handle->cfg.thr_num) { assert(0); return; } struct metric *temp = NULL; struct metric *node = NULL; HASH_ITER(hh, handle->root[thr_idx], node, temp) { if (node->sent_pkts == 0 && node->recv_pkts == 0 && node->sent_bytes == 0 && node->recv_bytes == 0) { continue; } const struct field tags[] = { {"vsys_id", FIELD_VALUE_INTEGER, {.value_longlong = node->key.vsys_id}}, {"rule_id", FIELD_VALUE_INTEGER, {.value_longlong = (long long)node->key.rule_id}}, {"sff_profile_id", FIELD_VALUE_INTEGER, {.value_longlong = node->key.sff_profile_id}}, {"sf_profile_id", FIELD_VALUE_INTEGER, {.value_longlong = node->key.sf_profile_id}}, }; fieldstat_easy_counter_incrby(handle->fs, thr_idx, handle->sent_pkts_idx, tags, sizeof(tags) / sizeof(tags[0]), node->sent_pkts); fieldstat_easy_counter_incrby(handle->fs, thr_idx, handle->sent_bytes_idx, tags, sizeof(tags) / sizeof(tags[0]), node->sent_bytes); fieldstat_easy_counter_incrby(handle->fs, thr_idx, handle->recv_pkts_idx, tags, sizeof(tags) / sizeof(tags[0]), node->recv_pkts); fieldstat_easy_counter_incrby(handle->fs, thr_idx, handle->recv_bytes_idx, tags, sizeof(tags) / sizeof(tags[0]), node->recv_bytes); } } int sf_metrics_get_interval(struct sf_metrics *handle) { return handle->cfg.output_fs_interval_ms; }