#include #include #include #include #include "tfe_stream.h" #include "tfe_resource.h" #include "tfe_packet_io.h" int tfe_fieldstat_intercept_incrby(struct fieldstat_easy_intercept *metrics, void *val_data, int thread_index) { int ret = 0; int hit_count = 0; uint16_t out_size = 0; struct session_ctx *s_ctx = (struct session_ctx *)val_data; struct tfe_cmsg *cmsg = s_ctx->cmsg; if (cmsg == NULL) { return 0; } int c2s_dir = s_ctx->c2s_info.is_e2i_dir; int c2s_rx_pkts = s_ctx->c2s_info.rx.n_pkts - s_ctx->c2s_info.rx_send_complete.n_pkts; int c2s_rx_bytes = s_ctx->c2s_info.rx.n_bytes - s_ctx->c2s_info.rx_send_complete.n_bytes; int s2c_dir = s_ctx->s2c_info.is_e2i_dir; int s2c_rx_pkts = s_ctx->s2c_info.rx.n_pkts - s_ctx->s2c_info.rx_send_complete.n_pkts; int s2c_rx_bytes = s_ctx->s2c_info.rx.n_bytes - s_ctx->s2c_info.rx_send_complete.n_bytes; s_ctx->c2s_info.rx_send_complete = s_ctx->c2s_info.rx; s_ctx->s2c_info.rx_send_complete = s_ctx->s2c_info.rx; if (c2s_rx_pkts == 0 && c2s_rx_bytes == 0 && s2c_rx_pkts == 0 && s2c_rx_bytes == 0) { return 0; } int vsys_id = 0; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_VSYS_ID, (unsigned char *)&vsys_id, sizeof(vsys_id), &out_size); if (ret != 0) { TFE_LOG_ERROR(g_default_logger, "failed at fetch vsys_id from cmsg: %s", strerror(-ret)); return 0; } uuid_t rule_id; char str_rule_id[UUID_STRING_SIZE] = {0}; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_POLICY_ID, (unsigned char *)rule_id, UUID_LEN, &out_size); if (ret != 0) { TFE_LOG_ERROR(g_default_logger, "failed at fetch rule_id from cmsg: %s", strerror(-ret)); return 0; } uuid_unparse(rule_id, str_rule_id); uint8_t hit_no_intercept = 0; ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_HIT_NO_INTERCEPT, (unsigned char *)&hit_no_intercept, sizeof(hit_no_intercept), &out_size); if (ret != 0) { TFE_LOG_ERROR(g_default_logger, "failed at fetch hit_no_intercept from cmsg: %s", strerror(-ret)); return 0; } if (s_ctx->metric_hit == 0 && s_ctx->send_log_flag) { s_ctx->metric_hit = 1; hit_count = 1; } int in_pkts = 0; int in_bytes = 0; int out_pkts = 0; int out_bytes = 0; // incoming : E2I 的流量 // outgoing : I2E 的流量 // first_ctr_packet_dir <==> client hello packet dir // 1: E2I 0:I2E if (c2s_dir == 1) { in_pkts += c2s_rx_pkts; in_bytes += c2s_rx_bytes; } else { out_pkts += c2s_rx_pkts; out_bytes += c2s_rx_bytes; } if (s2c_dir == 1) { in_pkts += s2c_rx_pkts; in_bytes += s2c_rx_bytes; } else { out_pkts += s2c_rx_pkts; out_bytes += s2c_rx_bytes; } int nr_tags = 0; struct field tags[5] = {0}; FIELDSTAT_TAG_INIT(tags, nr_tags, "vsys_id", FIELD_VALUE_INTEGER, vsys_id); nr_tags++; FIELDSTAT_TAG_STR(tags, nr_tags, "rule_uuid", FIELD_VALUE_CSTRING, str_rule_id); nr_tags++; uint8_t pinning_status = 0; if (tfe_cmsg_get_value(cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&pinning_status, sizeof(pinning_status), &out_size) == 0) { FIELDSTAT_TAG_INIT(tags, nr_tags, "pinning_status", FIELD_VALUE_INTEGER, pinning_status); nr_tags++; } // action : 2 Intercept; 3 No Intercept FIELDSTAT_TAG_INIT(tags, nr_tags, "action", FIELD_VALUE_INTEGER, (hit_no_intercept == 1 ? 3 : 2)); nr_tags++; if (hit_count > 0) fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->hit_count_idx, tags, (size_t)nr_tags, hit_count); if (in_pkts > 0) fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->in_pkts_idx, tags, (size_t)nr_tags, in_pkts); if (in_bytes > 0) fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->in_bytes_idx, tags, (size_t)nr_tags, in_bytes); if (out_pkts > 0) fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->out_pkts_idx, tags, (size_t)nr_tags, out_pkts); if (out_bytes > 0) fieldstat_easy_counter_incrby(metrics->fs, thread_index, metrics->out_bytes_idx, tags, (size_t)nr_tags, out_bytes); return 1; } int tfe_fieldstat_get_output_interval(struct fieldstat_easy_intercept *fieldstat) { return fieldstat->output_fs_interval_ms; } int tfe_fieldstat_manipulation_incrby(struct filedstat_easy_manipulation *fieldstat, unsigned int counter_id, long long value, const struct field tags[], int n_tags, int thread_id) { return fieldstat_easy_counter_incrby(fieldstat->fs, thread_id, counter_id, tags, (size_t)n_tags, value); } static void *tfe_fieldstat_thread_cycle(void *arg) { struct tfe_fieldstat_easy_t *tfe_fieldstat4_easy = (struct tfe_fieldstat_easy_t *)arg; ATOMIC_SET(&tfe_fieldstat4_easy->thr_is_runing, 1); char **ptr = NULL; size_t len = 0; while (!ATOMIC_READ(&tfe_fieldstat4_easy->thr_need_exit)) { if(tfe_fieldstat4_easy->intercept) { fieldstat_easy_output_array_and_reset(tfe_fieldstat4_easy->intercept->fs, &ptr, &len); if (ptr) { for (size_t i = 0; i < len; i++) { kafka_send(tfe_get_kafka_handle(), TOPIC_RULE_HITS, ptr[i], strlen(ptr[i])); free(ptr[i]); ptr[i] = NULL; } free(ptr); } } if(tfe_fieldstat4_easy->manipulation) { fieldstat_easy_output_array_and_reset(tfe_fieldstat4_easy->manipulation->fs, &ptr, &len); if (ptr) { for (size_t i = 0; i < len; i++) { kafka_send(tfe_get_kafka_handle(), TOPIC_RULE_HITS, ptr[i], strlen(ptr[i])); free(ptr[i]); ptr[i] = NULL; } free(ptr); } } usleep(tfe_fieldstat4_easy->output_kafka_interval_ms * 1000); } ATOMIC_SET(&tfe_fieldstat4_easy->thr_is_runing, 0); return NULL; } struct fieldstat_easy_intercept *tfe_fieldstat_easy_intercept_create(char *app_name, int max_thread, int output_fs_interval_ms, void *local_logger) { struct fieldstat_easy_intercept *fieldstat = ALLOC(struct fieldstat_easy_intercept, 1); const struct field tags[] = { {"data_center", FIELD_VALUE_CSTRING, {.value_str = tfe_get_data_center()}}, {"device_group", FIELD_VALUE_CSTRING, {.value_str = tfe_get_device_group()}}, {"device_id", FIELD_VALUE_CSTRING, {.value_str = tfe_get_device_id()}}, }; fieldstat->fs = fieldstat_easy_new(max_thread, app_name, tags, sizeof(tags) / sizeof(tags[0])); if (!fieldstat->fs) { TFE_LOG_ERROR(local_logger, "fieldstat4 easy intercept instance init failed."); FREE(&fieldstat); return NULL; } fieldstat->output_fs_interval_ms = output_fs_interval_ms; fieldstat->hit_count_idx = fieldstat_easy_register_counter(fieldstat->fs, "hit_count"); fieldstat->in_bytes_idx = fieldstat_easy_register_counter(fieldstat->fs, "in_bytes"); fieldstat->out_bytes_idx = fieldstat_easy_register_counter(fieldstat->fs, "out_bytes"); fieldstat->in_pkts_idx = fieldstat_easy_register_counter(fieldstat->fs, "in_pkts"); fieldstat->out_pkts_idx = fieldstat_easy_register_counter(fieldstat->fs, "out_pkts"); return fieldstat; } struct filedstat_easy_manipulation *tfe_fieldstat_easy_manipulation_create(char *app_name, char *outpath, int cycle, int max_thread, void *local_logger) { const char *counter_field[COLUMN_MAX] = {"hit_count", "in_bytes", "out_bytes", "in_pkts", "out_pkts"}; struct field metric_tags[TAG_MAX - 1] = {{"vsys_id", FIELD_VALUE_INTEGER, -1}, {"rule_id", FIELD_VALUE_INTEGER, -1}, {"action", FIELD_VALUE_INTEGER, -1}, {"sub_action", FIELD_VALUE_CSTRING, -1}}; struct filedstat_easy_manipulation *fieldstat = ALLOC(struct filedstat_easy_manipulation, 1); const struct field tags[] = { {"data_center", FIELD_VALUE_CSTRING, {.value_str = tfe_get_data_center()}}, {"device_group", FIELD_VALUE_CSTRING, {.value_str = tfe_get_device_group()}}, {"device_id", FIELD_VALUE_CSTRING, {.value_str = tfe_get_device_id()}}, }; fieldstat->fs = fieldstat_easy_new(max_thread, app_name, tags, sizeof(tags) / sizeof(tags[0])); if(!fieldstat->fs) { TFE_LOG_ERROR(local_logger, "fieldstat4 easy manipulation instance init failed."); FREE(&fieldstat); return NULL; } if(cycle > 0) { fieldstat_easy_enable_auto_output(fieldstat->fs, outpath, cycle); } for(int i=0; icounter_array[i]=fieldstat_easy_register_counter(fieldstat->fs, counter_field[i]); if(fieldstat->counter_array[i] < 0) { TFE_LOG_ERROR(local_logger, "fieldstat4 easy register counter failed."); FREE(&fieldstat); return NULL; } } fieldstat->tags = ALLOC(struct field*, max_thread); for (int i = 0; i < max_thread; i++) { fieldstat->tags[i] = ALLOC(struct field, TAG_MAX-1); memcpy(fieldstat->tags[i], metric_tags, sizeof(struct field) * (size_t)(TAG_MAX-1)); } return fieldstat; } struct tfe_fieldstat_easy_t *tfe_fieldstat_easy_create(int output_kafka_interval_ms) { struct tfe_fieldstat_easy_t *fieldstat = ALLOC(struct tfe_fieldstat_easy_t, 1); fieldstat->output_kafka_interval_ms = output_kafka_interval_ms; if (pthread_create(&fieldstat->tid, NULL, tfe_fieldstat_thread_cycle, (void *)fieldstat) < 0) { FREE(&fieldstat); return NULL; } return fieldstat; } void tfe_fieldstat_easy_destroy(struct tfe_fieldstat_easy_t *fieldstat) { if(fieldstat) { if(fieldstat->manipulation) { if(fieldstat->manipulation->fs) { fieldstat_easy_free(fieldstat->manipulation->fs); } for (int i = 0; i < fieldstat->manipulation->max_thread; i++) { if (fieldstat->manipulation->tags[i]) { FREE(&fieldstat->manipulation->tags[i]); } } FREE(&fieldstat->manipulation->tags); } if(fieldstat->intercept) { if(fieldstat->intercept->fs) { fieldstat_easy_free(fieldstat->intercept->fs); } } FREE(&fieldstat); } }