diff --git a/common/include/tfe_kafka_logger.h b/common/include/tfe_kafka_logger.h index de182ba..2bed406 100644 --- a/common/include/tfe_kafka_logger.h +++ b/common/include/tfe_kafka_logger.h @@ -9,26 +9,34 @@ extern "C" #include #include - typedef struct tfe_kafka_logger_s - { - int enable; - int t_vsys_id; +enum kafka_topic_type +{ + TOPIC_LOGGER, + TOPIC_BUCKET, + TOPIC_MAX +}; - unsigned int local_ip_num; - char local_ip_str[TFE_SYMBOL_MAX]; +typedef struct tfe_kafka_logger_s +{ + int enable; + int t_vsys_id; - char topic_name[TFE_STRING_MAX]; - char broker_list[TFE_STRING_MAX]; + unsigned int local_ip_num; + char local_ip_str[TFE_SYMBOL_MAX]; - rd_kafka_t *kafka_handle; - rd_kafka_topic_t *kafka_topic; - } tfe_kafka_logger_t; + char topic_name[TOPIC_MAX][TFE_STRING_MAX]; + char broker_list[TFE_STRING_MAX]; + + rd_kafka_t *kafka_handle; + rd_kafka_topic_t *kafka_topic[TOPIC_MAX]; +} tfe_kafka_logger_t; tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, const char *sasl_username, const char *sasl_passwd, void *local_logger); +int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_name, void *local_logger); void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger); -int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, const char *data, int len); +int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, int topic_id, const char *data, int len); #ifdef __cpluscplus } diff --git a/common/include/tfe_resource.h b/common/include/tfe_resource.h index 7006f49..d396bf3 100644 --- a/common/include/tfe_resource.h +++ b/common/include/tfe_resource.h @@ -1,5 +1,16 @@ #pragma once +struct app_id_dict +{ + int ref_cnt; + int app_id; + long long int group_id; + + pthread_mutex_t lock; +}; + +void app_id_dict_free(struct app_id_dict *app_dict); + enum RESOURCE_TYPE { STATIC_MAAT, @@ -16,6 +27,7 @@ enum TABLE_TYPE TABLE_SECURITY_SOURCE_LOCATION, TABLE_SECURITY_DESTINATION_LOCATION, TABLE_OBJ_SUBSCRIBER_ID, + TABLE_OBJ_APP_ID_DICT, TABLE_TYPE_MAX }; diff --git a/common/include/tfe_scan.h b/common/include/tfe_scan.h index 455d839..f5baa2b 100644 --- a/common/include/tfe_scan.h +++ b/common/include/tfe_scan.h @@ -11,3 +11,10 @@ int tfe_scan_ip_location(const struct tfe_stream *stream, long long *result, str int hit_cnt, void *logger, char **location_server, char **location_client); int tfe_scan_fqdn_cat(const struct tfe_stream *stream, long long *result, struct maat_state *scan_mid, int hit_cnt, void *logger, int table_id); +int tfe_scan_app_id(long long *result, struct maat_state *scan_mid, int hit_cnt, int app_id, int table_id); +int tfe_scan_ipv4_addr(long long *result, struct maat_state *scan_mid, int hit_cnt, struct ipaddr sapp_addr); +int tfe_scan_ipv6_addr(long long *result, struct maat_state *scan_mid, int hit_cnt, struct ipaddr sapp_addr); +int tfe_scan_ipv4_internal_addr(const struct tfe_stream *stream, long long *result, struct maat_state *scan_mid, + int hit_cnt, struct ipaddr sapp_addr); +int tfe_scan_ipv6_internal_addr(const struct tfe_stream *stream, long long *result, struct maat_state *scan_mid, + int hit_cnt, struct ipaddr sapp_addr); \ No newline at end of file diff --git a/common/src/tfe_kafka_logger.cpp b/common/src/tfe_kafka_logger.cpp index 3a59595..d34a332 100644 --- a/common/src/tfe_kafka_logger.cpp +++ b/common/src/tfe_kafka_logger.cpp @@ -104,6 +104,21 @@ static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_ return handle; } +int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_name, void *local_logger) +{ + strncpy(logger->topic_name[TOPIC_BUCKET], topic_name, sizeof(logger->topic_name[TOPIC_BUCKET])-1); + logger->kafka_topic[TOPIC_BUCKET] = rd_kafka_topic_new(logger->kafka_handle, topic_name, NULL); + if (logger->kafka_topic[TOPIC_BUCKET] == NULL) + { + TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", topic_name); + rd_kafka_destroy(logger->kafka_handle); + free(logger); + return 0; + } + + return 1; +} + tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, const char *sasl_username, const char *sasl_passwd, void *local_logger) { char *override_sled_ip=NULL; @@ -141,11 +156,11 @@ create_kafka: return NULL; } - strncpy(logger->topic_name, topic_name, sizeof(logger->topic_name)-1); - logger->kafka_topic = rd_kafka_topic_new(logger->kafka_handle, logger->topic_name, NULL); - if (logger->kafka_topic == NULL) + strncpy(logger->topic_name[TOPIC_LOGGER], topic_name, sizeof(logger->topic_name[TOPIC_LOGGER])-1); + logger->kafka_topic[TOPIC_LOGGER] = rd_kafka_topic_new(logger->kafka_handle, topic_name, NULL); + if (logger->kafka_topic[TOPIC_LOGGER] == NULL) { - TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", logger->topic_name); + TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", logger->topic_name[TOPIC_LOGGER]); rd_kafka_destroy(logger->kafka_handle); free(logger); return NULL; @@ -161,18 +176,21 @@ void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger) if (logger->kafka_handle) rd_kafka_destroy(logger->kafka_handle); - if (logger->kafka_topic) - rd_kafka_topic_destroy(logger->kafka_topic); + if (logger->kafka_topic[TOPIC_LOGGER]) + rd_kafka_topic_destroy(logger->kafka_topic[TOPIC_LOGGER]); + + if (logger->kafka_topic[TOPIC_BUCKET]) + rd_kafka_topic_destroy(logger->kafka_topic[TOPIC_BUCKET]); free(logger); logger = NULL; } } -int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, const char *data, int len) +int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, int topic_id, const char *data, int len) { if (logger && logger->enable) - return rd_kafka_produce(logger->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL); + return rd_kafka_produce(logger->kafka_topic[topic_id], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL); else return 0; } diff --git a/common/src/tfe_resource.cpp b/common/src/tfe_resource.cpp index 9caae9a..2a4e73f 100644 --- a/common/src/tfe_resource.cpp +++ b/common/src/tfe_resource.cpp @@ -10,12 +10,7 @@ #define MAAT_INPUT_REDIS 1 #define MAAT_INPUT_FILE 2 -struct maat_table_info -{ - int id; - const char *name; -}; - +static int scan_table_id[TABLE_TYPE_MAX]; static struct maat *static_maat = NULL; static tfe_kafka_logger_t *kafka_logger = NULL; static struct tfe_fieldstat_metric_t *dynamic_fieldstat = NULL; @@ -184,7 +179,8 @@ static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char * int enable = 0, vsystem_id = 0; char nic_name[TFE_SYMBOL_MAX] = {0}; char brokerlist[TFE_STRING_MAX] = {0}; - char topic_name[TFE_STRING_MAX] = {0}; + char logger_topic[TFE_STRING_MAX] = {0}; + char bucket_topic[TFE_STRING_MAX] = {0}; char sasl_username[TFE_STRING_MAX] = {0}; char sasl_passwd[TFE_STRING_MAX] = {0}; tfe_kafka_logger_t *kafka_logger = NULL; @@ -193,7 +189,8 @@ static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char * MESA_load_profile_int_def(profile, section, "VSYSTEM_ID", &vsystem_id, 1); MESA_load_profile_string_def(profile, section, "NIC_NAME", nic_name, sizeof(nic_name), "eth0"); MESA_load_profile_string_def(profile, section, "KAFKA_BROKERLIST", brokerlist, sizeof(brokerlist), ""); - MESA_load_profile_string_def(profile, section, "KAFKA_TOPIC", topic_name, sizeof(topic_name), "POLICY-EVENT-LOG"); + MESA_load_profile_string_def(profile, section, "LOGGER_SEND_TOPIC", logger_topic, sizeof(logger_topic), "PROXY-EVENT"); + MESA_load_profile_string_def(profile, section, "FILE_BUCKET_TOPIC", bucket_topic, sizeof(bucket_topic), "TRAFFIC-FILE-STREAM-RECORD"); MESA_load_profile_string_def(profile, section, "SASL_USERNAME", sasl_username, sizeof(sasl_username), ""); MESA_load_profile_string_def(profile, section, "SASL_PASSWD", sasl_passwd, sizeof(sasl_passwd), ""); @@ -203,18 +200,24 @@ static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char * return NULL; } - kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, topic_name, sasl_username, sasl_passwd, logger); + kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, logger_topic, sasl_username, sasl_passwd, logger); if (kafka_logger == NULL) { TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create kafka logger."); return NULL; } - kafka_logger->t_vsys_id=vsystem_id; + int ret = tfe_kafka_logger_topic_new(kafka_logger, bucket_topic, logger); + if(ret == 0) + { + return NULL; + } + kafka_logger->t_vsys_id=vsystem_id; - TFE_LOG_INFO(logger, "tfe kafka logger : %s", enable ? "ENABLE" : "DISABLE"); - TFE_LOG_INFO(logger, "tfe kafka vsystem id : %d", vsystem_id); - TFE_LOG_INFO(logger, "tfe kafka topic : %s", topic_name); - TFE_LOG_INFO(logger, "tfe kafka brokerlist : %s", brokerlist); + TFE_LOG_INFO(logger, "tfe kafka logger : %s", enable ? "ENABLE" : "DISABLE"); + TFE_LOG_INFO(logger, "tfe kafka vsystem id : %d", vsystem_id); + TFE_LOG_INFO(logger, "tfe logger kafka topic : %s", logger_topic); + TFE_LOG_INFO(logger, "tfe bucket kafka topic : %s", bucket_topic); + TFE_LOG_INFO(logger, "tfe kafka brokerlist : %s", brokerlist); if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0) { @@ -302,25 +305,96 @@ static char* create_effective_device_tag(const char *profile, const char *sectio return effective_device_tag; } -static struct maat_table_info maat_pub_tables[TABLE_TYPE_MAX] = { - {0, "ATTR_SOURCE_ASN"}, - {0, "ATTR_DESTINATION_ASN"}, - {0, "ATTR_SOURCE_LOCATION"}, - {0, "ATTR_DESTINATION_LOCATION"}, - {0, "ATTR_SUBSCRIBER_ID"}}; - -static int register_maat_table() +void app_dict_table_new_cb(const char *table_name, int table_id, const char* key, const char* table_line, void **ad, long argl, void* argp) { - for (int i = 0; i < TABLE_TYPE_MAX; i++) - { - maat_pub_tables[i].id = maat_get_table_id(static_maat, maat_pub_tables[i].name); - if (maat_pub_tables[i].id < 0) - { - TFE_LOG_ERROR(g_default_logger, "Maat table %s register failed.", maat_pub_tables[i].name); - return -1; - } - } + int ret=0; + size_t offset=0, len=0; + char *app_id_str=NULL, *group_id_str=NULL; + struct app_id_dict *app_dict=ALLOC(struct app_id_dict, 1); + ret = maat_helper_read_column(table_line, 1, &offset, &len); + if(ret >= 0) + { + app_id_str=ALLOC(char, len+1); + memcpy(app_id_str, table_line+offset, len); + app_dict->app_id=atoi(app_id_str); + FREE(&app_id_str); + } + + ret = maat_helper_read_column(table_line, 18, &offset, &len); + if(ret >= 0) + { + group_id_str=ALLOC(char, len+1); + memcpy(group_id_str, table_line+offset, len); + app_dict->group_id=atoll(group_id_str); + FREE(&group_id_str); + } + + app_dict->ref_cnt=1; + pthread_mutex_init(&(app_dict->lock), NULL); + *ad=app_dict; + return; +} + +void app_dict_table_free_cb(int table_id, void **ad, long argl, void* argp) +{ + if(*ad==NULL) + { + return; + } + + struct app_id_dict *app_dict=(struct app_id_dict *)(*ad); + pthread_mutex_lock(&(app_dict->lock)); + app_dict->ref_cnt--; + if(app_dict->ref_cnt>0) + { + pthread_mutex_unlock(&(app_dict->lock)); + return; + } + pthread_mutex_unlock(&(app_dict->lock)); + pthread_mutex_destroy(&(app_dict->lock)); + + FREE(&app_dict); + *ad=NULL; + return; +} + +void app_id_dict_free(struct app_id_dict *app_dict) +{ + app_dict_table_free_cb(0, (void **)&app_dict, 0, NULL); +} + +void app_dict_table_dup_cb(int table_id, void **to, void **from, long argl, void* argp) +{ + struct app_id_dict *app_dict=(struct app_id_dict *)(*from); + pthread_mutex_lock(&(app_dict->lock)); + app_dict->ref_cnt++; + pthread_mutex_unlock(&(app_dict->lock)); + *to=app_dict; + + return; +} + +static int maat_common_table_init() +{ + const char * table_name[TABLE_TYPE_MAX]; + table_name[TABLE_SECURITY_SOURCE_ASN] = "ATTR_SOURCE_ASN"; + table_name[TABLE_SECURITY_DESTINATION_ASN]="ATTR_DESTINATION_ASN"; + table_name[TABLE_SECURITY_SOURCE_LOCATION] = "ATTR_SOURCE_LOCATION"; + table_name[TABLE_SECURITY_DESTINATION_LOCATION] = "ATTR_DESTINATION_LOCATION"; + table_name[TABLE_OBJ_SUBSCRIBER_ID] = "ATTR_SUBSCRIBER_ID"; + table_name[TABLE_OBJ_APP_ID_DICT] = "APP_ID_DICT"; + + for (int i = 0; i < TABLE_TYPE_MAX; i++) + { + scan_table_id[i] = maat_get_table_id(static_maat, table_name[i]); + if (scan_table_id[i] < 0) + { + TFE_LOG_ERROR(g_default_logger, "Maat table %s register failed.", table_name[i]); + return -1; + } + } + maat_plugin_table_ex_schema_register(static_maat, "APP_ID_DICT", app_dict_table_new_cb, app_dict_table_free_cb, app_dict_table_dup_cb, 0, NULL); return 0; } @@ -350,7 +424,7 @@ int tfe_bussiness_resouce_init() effective_device_tag = create_effective_device_tag(profile_path, "MAAT", g_default_logger); - if (register_maat_table()) + if (maat_common_table_init()) { return -1; } @@ -379,5 +453,5 @@ void *tfe_bussiness_resouce_get(enum RESOURCE_TYPE type) int tfe_bussiness_tableid_get(enum TABLE_TYPE type) { - return maat_pub_tables[type].id; + return scan_table_id[type]; } \ No newline at end of file diff --git a/common/src/tfe_scan.cpp b/common/src/tfe_scan.cpp index 6aace94..5d3edcc 100644 --- a/common/src/tfe_scan.cpp +++ b/common/src/tfe_scan.cpp @@ -1,6 +1,7 @@ #include #include #include +#include int tfe_scan_subscribe_id(const struct tfe_stream *stream, long long *result, struct maat_state *scan_mid, int hit_cnt, void *logger) @@ -43,6 +44,12 @@ int tfe_scan_subscribe_id(const struct tfe_stream *stream, long long *result, st TFE_LOG_INFO(logger, "Scan src TSG_OBJ_SUBSCRIBER_ID, NO hit subid: %s scan ret: %d addr: %s", source_subscribe_id, scan_ret, stream->str_stream_info); } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), tfe_bussiness_tableid_get(TABLE_OBJ_SUBSCRIBER_ID), + result + hit_cnt + hit_cnt_ip, MAX_SCAN_RESULT - hit_cnt - hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } } if (strlen(dest_subscribe_id)) @@ -61,6 +68,12 @@ int tfe_scan_subscribe_id(const struct tfe_stream *stream, long long *result, st TFE_LOG_INFO(logger, "Scan dst TSG_OBJ_SUBSCRIBER_ID, NO hit subid: %s scan ret: %d addr: %s", dest_subscribe_id, scan_ret, stream->str_stream_info); } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), tfe_bussiness_tableid_get(TABLE_OBJ_SUBSCRIBER_ID), + result + hit_cnt + hit_cnt_ip, MAX_SCAN_RESULT - hit_cnt - hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } } return hit_cnt_ip; @@ -118,7 +131,6 @@ int tfe_scan_fqdn_cat(const struct tfe_stream *stream, long long *result, struct category_id_val[i], scan_ret, stream->str_stream_info); } } - scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, result + hit_cnt + hit_cnt_fqdn, MAX_SCAN_RESULT - hit_cnt - hit_cnt_fqdn, &n_hit_result, scan_mid); if (scan_ret == MAAT_SCAN_HIT) @@ -170,6 +182,13 @@ int tfe_scan_ip_location(const struct tfe_stream *stream, long long *result, str TFE_LOG_INFO(logger, "Scan TSG_SECURITY_DESTINATION_LOCATION, NO hit location: %s scan ret: %d addr: %s", dst_ip_location, scan_ret, stream->str_stream_info); } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), tfe_bussiness_tableid_get(TABLE_SECURITY_DESTINATION_LOCATION), + result + hit_cnt + hit_cnt_ip, MAX_SCAN_RESULT - hit_cnt - hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + *location_server = (char *)ALLOC(char, strlen(dst_ip_location)); memcpy(*location_server,dst_ip_location,strlen(dst_ip_location)-1); } @@ -189,6 +208,13 @@ int tfe_scan_ip_location(const struct tfe_stream *stream, long long *result, str TFE_LOG_INFO(logger, "Scan TSG_SECURITY_SOURCE_LOCATION, NO hit location: %s scan ret: %d addr: %s", src_ip_location, scan_ret, stream->str_stream_info); } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), tfe_bussiness_tableid_get(TABLE_SECURITY_SOURCE_LOCATION), + result + hit_cnt + hit_cnt_ip, MAX_SCAN_RESULT - hit_cnt - hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + *location_client = (char *)ALLOC(char, strlen(src_ip_location)); memcpy(*location_client,src_ip_location, strlen(src_ip_location)-1); } @@ -250,6 +276,12 @@ int tfe_scan_ip_asn(const struct tfe_stream *stream, long long *result, struct m TFE_LOG_INFO(logger, "Scan TSG_SECURITY_DESTINATION_ASN, NO hit asn: %s scan ret: %d addr: %s", dst_asn, scan_ret, stream->str_stream_info); } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), tfe_bussiness_tableid_get(TABLE_SECURITY_DESTINATION_ASN), + result + hit_cnt + hit_cnt_ip, MAX_SCAN_RESULT - hit_cnt - hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } memset(buff, 0, sizeof(buff)); snprintf(buff, sizeof(buff), "%s(%s)", dst_asn, dst_org); *asn_server = tfe_strdup(buff); @@ -257,7 +289,7 @@ int tfe_scan_ip_asn(const struct tfe_stream *stream, long long *result, struct m if (strlen(src_asn)) { scan_ret = maat_scan_string((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), tfe_bussiness_tableid_get(TABLE_SECURITY_SOURCE_ASN), - src_asn, strlen(src_asn),result + hit_cnt + hit_cnt_ip, MAX_SCAN_RESULT - hit_cnt - hit_cnt_ip, + src_asn, strlen(src_asn), result + hit_cnt + hit_cnt_ip, MAX_SCAN_RESULT - hit_cnt - hit_cnt_ip, &n_hit_result, scan_mid); if (scan_ret == MAAT_SCAN_HIT) { @@ -270,10 +302,214 @@ int tfe_scan_ip_asn(const struct tfe_stream *stream, long long *result, struct m TFE_LOG_INFO(logger, "Scan ATTR_SOURCE_ASN, NO hit asn: %s scan ret: %d addr: %s", src_asn, scan_ret, stream->str_stream_info); } - + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), tfe_bussiness_tableid_get(TABLE_SECURITY_SOURCE_ASN), + result + hit_cnt + hit_cnt_ip, MAX_SCAN_RESULT - hit_cnt - hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } memset(buff, 0, sizeof(buff)); snprintf(buff, sizeof(buff), "%s(%s)", src_asn, src_org); *asn_client = tfe_strdup(buff); } return hit_cnt_ip; +} + +int tfe_scan_app_id(long long *result, struct maat_state *scan_mid, int hit_cnt, int app_id, int table_id) +{ + int scan_ret = 0; + int hit_app_id = 0; + size_t n_hit_result = 0; + + struct app_id_dict *app_dict = (struct app_id_dict*)maat_plugin_table_get_ex_data((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), tfe_bussiness_tableid_get(TABLE_OBJ_APP_ID_DICT), + (const char *)&app_id, sizeof(long long)); + if(app_dict!=NULL) + { + scan_ret = maat_scan_group((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, &app_dict->group_id, 1, result+hit_cnt+hit_app_id, + MAX_SCAN_RESULT-hit_cnt-hit_app_id, &n_hit_result, scan_mid); + if(scan_ret==MAAT_SCAN_HIT) + { + hit_app_id += n_hit_result; + } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, result+hit_cnt+hit_app_id, MAX_SCAN_RESULT-hit_cnt-hit_app_id, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_app_id += n_hit_result; + } + app_id_dict_free(app_dict); + } + return hit_app_id; +} + +int tfe_scan_ipv4_addr(long long *result, struct maat_state *scan_mid, int hit_cnt, struct ipaddr sapp_addr) +{ + int table_id=0; + int scan_ret = 0; + int hit_cnt_ip = 0; + size_t n_hit_result = 0; + + table_id = maat_get_table_id((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), "ATTR_SOURCE_ADDR"); + scan_ret = maat_scan_ipv4((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, sapp_addr.v4->saddr, sapp_addr.v4->source, 6, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + + table_id = maat_get_table_id((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), "ATTR_DESTINATION_ADDR"); + scan_ret = maat_scan_ipv4((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, sapp_addr.v4->daddr, sapp_addr.v4->dest, 6, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if(scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + + return hit_cnt_ip; +} + +int tfe_scan_ipv6_addr(long long *result, struct maat_state *scan_mid, int hit_cnt, struct ipaddr sapp_addr) +{ + int table_id=0; + int scan_ret = 0; + int hit_cnt_ip = 0; + size_t n_hit_result = 0; + + table_id = maat_get_table_id((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), "ATTR_SOURCE_ADDR"); + scan_ret = maat_scan_ipv6((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, sapp_addr.v6->saddr, sapp_addr.v6->source, 6, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + table_id = maat_get_table_id((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), "ATTR_DESTINATION_ADDR"); + scan_ret = maat_scan_ipv6((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, sapp_addr.v6->daddr, sapp_addr.v6->dest, 6, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + return hit_cnt_ip; +} + +static int get_route_dir(const struct tfe_stream * stream) +{ + uint16_t out_size; + unsigned int route_dir; int ret=0; + + struct tfe_cmsg *cmsg = tfe_stream_get0_cmsg(stream); + if (cmsg != NULL) + { + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_COMMON_DIRECTION, (unsigned char *)&route_dir, sizeof(route_dir), &out_size); + if (ret != 0) + { + return ret; + } + } + return (route_dir==69) ? 0 : 1; +} + +int tfe_scan_ipv4_internal_addr(const struct tfe_stream *stream, long long *result, struct maat_state *scan_mid, int hit_cnt, struct ipaddr sapp_addr) +{ + int table_id=0; + int scan_ret = 0; + int hit_cnt_ip = 0; + size_t n_hit_result = 0; + + int dir_is_e2i = get_route_dir(stream); + int scan_internal_table_id=maat_get_table_id((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), "ATTR_INTERNAL_ADDR"); + int scan_external_table_id=maat_get_table_id((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), "ATTR_EXTERNAL_ADDR"); + + table_id = (dir_is_e2i == 1) ? scan_internal_table_id : scan_external_table_id; + scan_ret = maat_scan_ipv4((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, sapp_addr.v4->saddr, sapp_addr.v4->source, 6, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + + table_id = (dir_is_e2i == 0) ? scan_internal_table_id : scan_external_table_id; + scan_ret = maat_scan_ipv4((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, sapp_addr.v4->daddr, sapp_addr.v4->dest, 6, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if(scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + + return hit_cnt_ip; +} + +int tfe_scan_ipv6_internal_addr(const struct tfe_stream *stream, long long *result, struct maat_state *scan_mid, int hit_cnt, struct ipaddr sapp_addr) +{ + int table_id=0; + int scan_ret = 0; + int hit_cnt_ip = 0; + size_t n_hit_result = 0; + + int dir_is_e2i = get_route_dir(stream); + int scan_internal_table_id=maat_get_table_id((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), "ATTR_INTERNAL_ADDR"); + int scan_external_table_id=maat_get_table_id((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), "ATTR_EXTERNAL_ADDR"); + + table_id = (dir_is_e2i == 1) ? scan_internal_table_id : scan_external_table_id; + scan_ret = maat_scan_ipv6((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, sapp_addr.v6->saddr, sapp_addr.v6->source, 6, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + table_id = (dir_is_e2i == 0) ? scan_internal_table_id : scan_external_table_id; + scan_ret = maat_scan_ipv6((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, sapp_addr.v6->daddr, sapp_addr.v6->dest, 6, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + scan_ret = maat_scan_not_logic((struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT), table_id, + result+hit_cnt+hit_cnt_ip, MAX_SCAN_RESULT-hit_cnt-hit_cnt_ip, &n_hit_result, scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt_ip += n_hit_result; + } + return hit_cnt_ip; } \ No newline at end of file diff --git a/conf/tfe/tfe.conf b/conf/tfe/tfe.conf index 2e369c0..1f4ba63 100644 --- a/conf/tfe/tfe.conf +++ b/conf/tfe/tfe.conf @@ -190,7 +190,8 @@ enable=1 vsystem_id=1 NIC_NAME=enp2s0 kafka_brokerlist=192.168.40.224:9092 -kafka_topic=PROXY-EVENT +logger_send_topic=PROXY-EVENT +file_bucket_topic=TRAFFIC-FILE-STREAM-RECORD sasl_username=admin sasl_passwd=galaxy2019 device_id_filepath=/opt/tsg/etc/tsg_sn.json diff --git a/platform/src/ssl_fetch_cert.cpp b/platform/src/ssl_fetch_cert.cpp index 9dd725b..98601ee 100644 --- a/platform/src/ssl_fetch_cert.cpp +++ b/platform/src/ssl_fetch_cert.cpp @@ -96,8 +96,8 @@ static void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerpr cJSON_AddStringToObject(obj, "tfe_ip", g_kafka_logger->local_ip_str); dup = cJSON_Duplicate(obj, 1); msg = cJSON_PrintUnformatted(dup); - TFE_LOG_DEBUG(g_default_logger, "log to [%s] msg:%s", g_kafka_logger->topic_name, msg); - tfe_kafka_logger_send(g_kafka_logger, msg, strlen(msg)); + TFE_LOG_DEBUG(g_default_logger, "log to [%s] msg:%s", g_kafka_logger->topic_name[TOPIC_LOGGER], msg); + tfe_kafka_logger_send(g_kafka_logger, TOPIC_LOGGER, msg, strlen(msg)); free(msg); cJSON_Delete(dup); diff --git a/plugin/business/doh/src/doh.cpp b/plugin/business/doh/src/doh.cpp index f8b4ba9..b6c7ce2 100644 --- a/plugin/business/doh/src/doh.cpp +++ b/plugin/business/doh/src/doh.cpp @@ -27,15 +27,6 @@ struct doh_action_param pthread_mutex_t lock; }; -struct doh_app_id_dict -{ - int ref_cnt; - int app_id; - long long int group_id; - - pthread_mutex_t lock; -}; - struct dns_str2idx { int index; @@ -151,29 +142,6 @@ void doh_action_param_free_cb(int table_id, void **ad, long argl, void *argp) return; } -void doh_app_dict_table_free_cb(int table_id, void **ad, long argl, void* argp) -{ - if(*ad==NULL) - { - return; - } - - struct doh_app_id_dict *app_dict=(struct doh_app_id_dict *)(*ad); - pthread_mutex_lock(&(app_dict->lock)); - app_dict->ref_cnt--; - if(app_dict->ref_cnt>0) - { - pthread_mutex_unlock(&(app_dict->lock)); - return; - } - pthread_mutex_unlock(&(app_dict->lock)); - pthread_mutex_destroy(&(app_dict->lock)); - - FREE(&app_dict); - *ad=NULL; - return; -} - static void doh_get_cheat_data(long long p_result, int qtype, struct doh_ctx *ctx, const char *str_stream_info) { int i; @@ -339,6 +307,12 @@ static void doh_maat_scan(const struct tfe_stream *stream, const struct tfe_http TFE_LOG_INFO(g_doh_conf->local_logger, "Scan %s, NO hit host: %s scan ret: %d addr: %s", g_doh_conf->tables[TYPE_HOST].name, host, scan_ret, stream->str_stream_info); } + scan_ret = maat_scan_not_logic(g_doh_conf->maat, g_doh_conf->tables[TYPE_HOST].id, + result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt += n_hit_result; + } scan_ret = tfe_scan_fqdn_cat(stream, result, ctx->scan_mid, hit_cnt, g_doh_conf->local_logger, g_doh_conf->tables[TYPE_HOST_CAT].id); if( scan_ret > 0) @@ -351,61 +325,35 @@ static void doh_maat_scan(const struct tfe_stream *stream, const struct tfe_http doh_addr_tfe2sapp(stream->addr, &sapp_addr); if (sapp_addr.addrtype == ADDR_TYPE_IPV4) { - scan_ret = maat_scan_ipv4(g_doh_conf->maat, g_doh_conf->tables[TYPE_SRC_ADDR].id,sapp_addr.v4->saddr, - sapp_addr.v4->source, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, - &n_hit_result, ctx->scan_mid); - if (n_hit_result == MAAT_SCAN_HIT) + scan_ret = tfe_scan_ipv4_addr(result, ctx->scan_mid, hit_cnt, sapp_addr); + if (scan_ret > 0) { - hit_cnt += n_hit_result; + hit_cnt += scan_ret; } - scan_ret = maat_scan_ipv4(g_doh_conf->maat, g_doh_conf->tables[TYPE_DST_ADDR].id,sapp_addr.v4->daddr, - sapp_addr.v4->dest, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, - &n_hit_result, ctx->scan_mid); - - if(scan_ret == MAAT_SCAN_HIT) + scan_ret = tfe_scan_ipv4_internal_addr(stream, result, ctx->scan_mid, hit_cnt, sapp_addr); + if (scan_ret > 0) { - hit_cnt += n_hit_result; + hit_cnt += scan_ret; } } if (sapp_addr.addrtype == ADDR_TYPE_IPV6) { - scan_ret = maat_scan_ipv6(g_doh_conf->maat, g_doh_conf->tables[TYPE_SRC_ADDR].id, sapp_addr.v6->saddr, - sapp_addr.v6->source, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, - &n_hit_result, ctx->scan_mid); - if (scan_ret == MAAT_SCAN_HIT) + scan_ret = tfe_scan_ipv6_addr(result, ctx->scan_mid, hit_cnt, sapp_addr); + if (scan_ret > 0) { - hit_cnt += n_hit_result; + hit_cnt += scan_ret; } - scan_ret = maat_scan_ipv6(g_doh_conf->maat,g_doh_conf->tables[TYPE_DST_ADDR].id, sapp_addr.v6->daddr, - sapp_addr.v6->dest, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, - &n_hit_result, ctx->scan_mid); - if (scan_ret == MAAT_SCAN_HIT) + scan_ret = tfe_scan_ipv6_internal_addr(stream, result, ctx->scan_mid, hit_cnt, sapp_addr); + if (scan_ret > 0) { - hit_cnt += n_hit_result; + hit_cnt += scan_ret; } } // scan appid - int table_id=maat_get_table_id(g_doh_conf->maat, "APP_ID_DICT"); - if(table_id < 0) + scan_ret = tfe_scan_app_id(result, ctx->scan_mid, hit_cnt, app_id, g_doh_conf->tables[TYPE_APPID].id); + if(scan_ret > 0) { - return; - } - struct doh_app_id_dict *app_dict = (struct doh_app_id_dict *)maat_plugin_table_get_ex_data(g_doh_conf->maat, table_id, (const char *)&app_id, sizeof(long long)); - if(app_dict!=NULL) - { - scan_ret = maat_scan_group(g_doh_conf->maat, g_doh_conf->tables[TYPE_APPID].id, &app_dict->group_id, 1, result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid); - if(scan_ret==MAAT_SCAN_HIT) - { - TFE_LOG_INFO(g_doh_conf->local_logger, "Scan %s, Hit proto: %d scan ret: %d policy_id: %lld addr: %s", - g_doh_conf->tables[TYPE_APPID].name, app_id, scan_ret, result[hit_cnt], stream->str_stream_info); - hit_cnt += n_hit_result; - } - else - { - TFE_LOG_INFO(g_doh_conf->local_logger, "Scan %s, NO hit proto: %d scan ret: %d addr: %s", - g_doh_conf->tables[TYPE_APPID].name, app_id, scan_ret, stream->str_stream_info); - } - doh_app_dict_table_free_cb(0, (void **)&app_dict, 0, NULL); + hit_cnt += scan_ret; } // scan qname @@ -422,6 +370,12 @@ static void doh_maat_scan(const struct tfe_stream *stream, const struct tfe_http TFE_LOG_INFO(g_doh_conf->local_logger, "Scan %s, NO hit domain: %s scan ret: %d addr: %s", g_doh_conf->tables[TYPE_QNAME].name, qname, scan_ret, stream->str_stream_info); } + scan_ret = maat_scan_not_logic(g_doh_conf->maat, g_doh_conf->tables[TYPE_QNAME].id, + result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt += n_hit_result; + } if (hit_cnt) { @@ -444,6 +398,8 @@ static int doh_maat_init(const char *profile, const char *section) MESA_load_profile_string_def(profile, section, "table_qname", g_doh_conf->tables[TYPE_QNAME].name, TFE_STRING_MAX, "ATTR_DOH_QNAME"); MESA_load_profile_string_def(profile, section, "table_host", g_doh_conf->tables[TYPE_HOST].name, TFE_STRING_MAX, "ATTR_DOH_HOST"); MESA_load_profile_string_def(profile, section, "table_host_cat", g_doh_conf->tables[TYPE_HOST_CAT].name, TFE_STRING_MAX, "ATTR_DOH_HOST_CAT"); + MESA_load_profile_string_def(profile, section, "table_internal_addr", g_doh_conf->tables[TYPE_INTERNAL_ADDR].name, TFE_STRING_MAX, "ATTR_INTERNAL_ADDR"); + MESA_load_profile_string_def(profile, section, "table_external_addr", g_doh_conf->tables[TYPE_EXTERNAL_ADDR].name, TFE_STRING_MAX, "ATTR_EXTERNAL_ADDR"); for (int i = 0; i < TYPE_MAX; i++) { diff --git a/plugin/business/doh/src/logger.cpp b/plugin/business/doh/src/logger.cpp index b9f986c..9cbaab4 100644 --- a/plugin/business/doh/src/logger.cpp +++ b/plugin/business/doh/src/logger.cpp @@ -20,6 +20,8 @@ enum _log_action //Bigger action number is prior. __LG_ACTION_MAX }; +#define get_time_ms(tv) ((long long)(tv.tv_sec) * 1000 + (long long)(tv.tv_usec) / 1000) + static int get_rr_str2json(cJSON *object, dns_info_t *dns_info, int *dns_sec) { int i = 0; @@ -308,7 +310,7 @@ int doh_send_log(struct doh_conf *handle, const struct tfe_http_session *http, c char *log_payload = NULL; int kafka_status = 0; int send_cnt = 0; - time_t cur_time; + struct timeval cur_time; char src_ip_str[MAX(INET6_ADDRSTRLEN, INET_ADDRSTRLEN)] = {0}; char dst_ip_str[MAX(INET6_ADDRSTRLEN, INET_ADDRSTRLEN)] = {0}; @@ -326,10 +328,10 @@ int doh_send_log(struct doh_conf *handle, const struct tfe_http_session *http, c } common_obj = cJSON_CreateObject(); - cur_time = time(NULL); + gettimeofday(&cur_time, NULL); - cJSON_AddNumberToObject(common_obj, "start_timestamp_ms", cur_time); - cJSON_AddNumberToObject(common_obj, "end_timestamp_ms", cur_time); + cJSON_AddNumberToObject(common_obj, "start_timestamp_ms", get_time_ms(cur_time)); + cJSON_AddNumberToObject(common_obj, "end_timestamp_ms", get_time_ms(cur_time)); cJSON_AddStringToObject(common_obj, "doh_version", app_proto[http->major_version]); cJSON_AddStringToObject(common_obj, "decoded_as", "DoH"); @@ -469,7 +471,7 @@ int doh_send_log(struct doh_conf *handle, const struct tfe_http_session *http, c TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload); - kafka_status = tfe_kafka_logger_send(handle->kafka_logger, log_payload, strlen(log_payload)); + kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_LOGGER, log_payload, strlen(log_payload)); free(log_payload); cJSON_Delete(per_hit_obj); if (kafka_status < 0) diff --git a/plugin/business/doh/src/pub.h b/plugin/business/doh/src/pub.h index 6606312..6f830a2 100644 --- a/plugin/business/doh/src/pub.h +++ b/plugin/business/doh/src/pub.h @@ -36,6 +36,8 @@ enum table_type TYPE_QNAME, TYPE_HOST, TYPE_HOST_CAT, + TYPE_INTERNAL_ADDR, + TYPE_EXTERNAL_ADDR, TYPE_MAX }; diff --git a/plugin/business/traffic-mirror/src/entry.cpp b/plugin/business/traffic-mirror/src/entry.cpp index 3f34c7d..cf00a2b 100644 --- a/plugin/business/traffic-mirror/src/entry.cpp +++ b/plugin/business/traffic-mirror/src/entry.cpp @@ -233,7 +233,7 @@ void profile_table_ex_data_new_cb(const char *table_name, int table_id, const ch goto ignore; } - TFE_LOG_DEBUG(instance->logger, "traffic mirror profile %s: vlan id[%d]£º %d", key, iter, vlan_in_number); + TFE_LOG_DEBUG(instance->logger, "traffic mirror profile %s: vlan id[%d] %d", key, iter, vlan_in_number); ex_data->rewrite_vlan = 1; ex_data->vlans[iter] = vlan_in_number; ex_data->ether_addrs[iter] = ether_addr_broadcast; diff --git a/plugin/business/tsg-http/src/tsg_http.cpp b/plugin/business/tsg-http/src/tsg_http.cpp index fe61ec2..e2644e2 100644 --- a/plugin/business/tsg-http/src/tsg_http.cpp +++ b/plugin/business/tsg-http/src/tsg_http.cpp @@ -75,6 +75,8 @@ enum scan_table PXY_CTRL_HTTP_RES_HDR, PXY_CTRL_HTTP_RES_BODY, PXY_CTRL_APP_ID, + PXY_CTRL_INTERNAL_ADDR, + PXY_CTRL_EXTERNAL_ADDR, __SCAN_TABLE_MAX }; @@ -103,19 +105,9 @@ enum manipulate_profile_table POLICY_PROFILE_TABLE_INSERT, POLICY_PROFILE_TABLE_HIJACK, POLICY_PROFILE_TABLE_LUA, - POLICY_PROFILE_TABLE_APP_ID, POLICY_PROFILE_TABLE_MAX }; -struct app_id_dict -{ - int ref_cnt; - int app_id; - long long int group_id; - - pthread_mutex_t lock; -}; - struct manipulate_profile { int profile_id; @@ -968,76 +960,6 @@ void ma_profile_table_dup_cb(int table_id, void **to, void **from, long argl, vo *to=ply_obj; } -void app_dict_table_new_cb(const char *table_name, int table_id, const char* key, const char* table_line, void **ad, long argl, void* argp) -{ - int ret=0; - size_t offset=0, len=0; - char *app_id_str=NULL, *group_id_str=NULL; - struct app_id_dict *app_dict=ALLOC(struct app_id_dict, 1); - - ret = maat_helper_read_column(table_line, 1, &offset, &len); - if(ret >= 0) - { - app_id_str=ALLOC(char, len+1); - memcpy(app_id_str, table_line+offset, len); - app_dict->app_id=atoi(app_id_str); - FREE(&app_id_str); - } - - ret = maat_helper_read_column(table_line, 18, &offset, &len); - if(ret >= 0) - { - group_id_str=ALLOC(char, len+1); - memcpy(group_id_str, table_line+offset, len); - app_dict->group_id=atoll(group_id_str); - FREE(&group_id_str); - } - - app_dict->ref_cnt=1; - pthread_mutex_init(&(app_dict->lock), NULL); - *ad=app_dict; - return; -} - -void app_dict_table_free_cb(int table_id, void **ad, long argl, void* argp) -{ - if(*ad==NULL) - { - return; - } - - struct app_id_dict *app_dict=(struct app_id_dict *)(*ad); - pthread_mutex_lock(&(app_dict->lock)); - app_dict->ref_cnt--; - if(app_dict->ref_cnt>0) - { - pthread_mutex_unlock(&(app_dict->lock)); - return; - } - pthread_mutex_unlock(&(app_dict->lock)); - pthread_mutex_destroy(&(app_dict->lock)); - - FREE(&app_dict); - *ad=NULL; - return; -} - -void app_id_dict_free(struct app_id_dict *app_dict) -{ - app_dict_table_free_cb(0, (void **)&app_dict, 0, NULL); -} - -void app_dict_table_dup_cb(int table_id, void **to, void **from, long argl, void* argp) -{ - struct app_id_dict *app_dict=(struct app_id_dict *)(*from); - pthread_mutex_lock(&(app_dict->lock)); - app_dict->ref_cnt++; - pthread_mutex_unlock(&(app_dict->lock)); - *to=app_dict; - - return; -} - int maat_table_init(const char* table_name, maat_start_callback_t *start, maat_update_callback_t *update, maat_finish_callback_t *finish, void *u_para) @@ -1107,6 +1029,9 @@ int proxy_policy_init(const char* profile_path, const char* static_section, cons table_name[PXY_CTRL_HTTP_RES_HDR] = "ATTR_HTTP_RES_HDR"; table_name[PXY_CTRL_HTTP_RES_BODY] = "ATTR_HTTP_RES_BODY"; table_name[PXY_CTRL_APP_ID] = "ATTR_APP_ID"; + table_name[PXY_CTRL_INTERNAL_ADDR] = "ATTR_INTERNAL_ADDR"; + table_name[PXY_CTRL_EXTERNAL_ADDR] = "ATTR_EXTERNAL_ADDR"; + for (int i = 0; i < __SCAN_TABLE_MAX; i++) { g_proxy_rt->scan_table_id[i] = maat_get_table_id(g_proxy_rt->feather, table_name[i]); @@ -1124,13 +1049,6 @@ int proxy_policy_init(const char* profile_path, const char* static_section, cons policy_action_param_dup, 0, NULL); - g_proxy_rt->plolicy_table_id[POLICY_PROFILE_TABLE_APP_ID]=maat_get_table_id(g_proxy_rt->feather, "APP_ID_DICT"); - maat_plugin_table_ex_schema_register(g_proxy_rt->feather, "APP_ID_DICT", - app_dict_table_new_cb, - app_dict_table_free_cb, - app_dict_table_dup_cb, - 0, NULL); - ret = maat_table_init("PXY_PROFILE_TRUSTED_CA_CERT", trusted_CA_update_start_cb, trusted_CA_update_cert_cb, @@ -2783,6 +2701,12 @@ enum proxy_action http_scan(const struct tfe_http_session * session, enum tfe_ht { hit_cnt += n_hit_result; } + scan_ret = maat_scan_not_logic(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_HTTP_FQDN], + result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt += n_hit_result; + } scan_ret = tfe_scan_fqdn_cat(stream, result, ctx->scan_mid, hit_cnt, g_proxy_rt->local_logger, g_proxy_rt->scan_table_id[PXY_CTRL_HTTP_FQDN_CAT]); if (scan_ret > 0) { @@ -2792,7 +2716,6 @@ enum proxy_action http_scan(const struct tfe_http_session * session, enum tfe_ht const char * str_url = session->req->req_spec.url; int str_url_length = (int) (strlen(session->req->req_spec.url)); - scan_ret = maat_scan_string(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_HTTP_URL], str_url, str_url_length, result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid); @@ -2800,6 +2723,12 @@ enum proxy_action http_scan(const struct tfe_http_session * session, enum tfe_ht { hit_cnt += n_hit_result; } + scan_ret = maat_scan_not_logic(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_HTTP_URL], + result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt += n_hit_result; + } } if ((events & EV_HTTP_REQ_HDR) || (events & EV_HTTP_RESP_HDR)) @@ -2817,7 +2746,6 @@ enum proxy_action http_scan(const struct tfe_http_session * session, enum tfe_ht const char * str_field_name = http_field_name_to_string(&field_name); scan_ret = maat_state_set_scan_district(ctx->scan_mid, table_id, str_field_name, strlen(str_field_name)); - assert(scan_ret == 0); scan_ret = maat_scan_string(g_proxy_rt->feather, table_id, field_val, strlen(field_val), result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid); @@ -3190,8 +3118,7 @@ void cache_write(const struct tfe_http_session * session, enum tfe_http_event ev } } -void proxy_on_http_begin(const struct tfe_stream * stream, - const struct tfe_http_session * session, unsigned int thread_id, void ** pme) +void proxy_on_http_begin(const struct tfe_stream *stream, const struct tfe_http_session *session, unsigned int thread_id, void **pme) { if (!g_proxy_rt->enable_plugin) { @@ -3205,73 +3132,58 @@ void proxy_on_http_begin(const struct tfe_stream * stream, ATOMIC_INC(&(g_proxy_rt->stat_val[STAT_SESSION])); ctx = proxy_http_ctx_new(thread_id); long long *result = ctx->result; - size_t n_hit_result=0; scan_ret = tfe_scan_subscribe_id(stream, result, ctx->scan_mid, hit_cnt, g_proxy_rt->local_logger); if(scan_ret>0) { hit_cnt+=scan_ret; } - scan_ret = tfe_scan_ip_location(stream, result, ctx->scan_mid, hit_cnt, g_proxy_rt->local_logger, &(ctx->ip_ctx.location_server), &(ctx->ip_ctx.location_client)); if(scan_ret>0) { hit_cnt+=scan_ret; } + scan_ret = tfe_scan_ip_asn(stream, result, ctx->scan_mid, hit_cnt, g_proxy_rt->local_logger, &(ctx->ip_ctx.asn_server), &(ctx->ip_ctx.asn_client)); if(scan_ret>0) { hit_cnt+=scan_ret; } - long long app_id=67; - struct app_id_dict *app_dict = (struct app_id_dict*)maat_plugin_table_get_ex_data(g_proxy_rt->feather, g_proxy_rt->plolicy_table_id[POLICY_PROFILE_TABLE_APP_ID], (const char *)&app_id, sizeof(long long)); - if(app_dict!=NULL) + scan_ret = tfe_scan_app_id(result, ctx->scan_mid, hit_cnt, app_id, g_proxy_rt->scan_table_id[PXY_CTRL_APP_ID]); + if(scan_ret > 0) { - scan_ret = maat_scan_group(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_APP_ID], &app_dict->group_id, 1, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, &n_hit_result, ctx->scan_mid); - if(scan_ret==MAAT_SCAN_HIT) - { - hit_cnt+=n_hit_result; - } - app_id_dict_free(app_dict); + hit_cnt += scan_ret; } addr_tfe2sapp(stream->addr, &sapp_addr); if (sapp_addr.addrtype == ADDR_TYPE_IPV4) { - scan_ret = maat_scan_ipv4(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_SOURCE_ADDR], - sapp_addr.v4->saddr, sapp_addr.v4->source, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, - &n_hit_result, ctx->scan_mid); - if (scan_ret == MAAT_SCAN_HIT) + scan_ret = tfe_scan_ipv4_addr(result, ctx->scan_mid, hit_cnt, sapp_addr); + if (scan_ret > 0) { - hit_cnt += n_hit_result; + hit_cnt += scan_ret; } - scan_ret = maat_scan_ipv4(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_DESTINATION_ADDR], - sapp_addr.v4->daddr, sapp_addr.v4->dest, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, - &n_hit_result, ctx->scan_mid); - - if(scan_ret == MAAT_SCAN_HIT) + scan_ret = tfe_scan_ipv4_internal_addr(stream, result, ctx->scan_mid, hit_cnt, sapp_addr); + if (scan_ret > 0) { - hit_cnt += n_hit_result; + hit_cnt += scan_ret; } } if (sapp_addr.addrtype == ADDR_TYPE_IPV6) { - scan_ret = maat_scan_ipv6(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_SOURCE_ADDR], - sapp_addr.v6->saddr, sapp_addr.v6->source, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, - &n_hit_result, ctx->scan_mid); - if (scan_ret == MAAT_SCAN_HIT) + scan_ret = tfe_scan_ipv6_addr(result, ctx->scan_mid, hit_cnt, sapp_addr); + if (scan_ret > 0) { - hit_cnt += n_hit_result; + hit_cnt += scan_ret; } - scan_ret = maat_scan_ipv6(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_DESTINATION_ADDR], - sapp_addr.v6->daddr, sapp_addr.v6->dest, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, - &n_hit_result, ctx->scan_mid); - if (scan_ret == MAAT_SCAN_HIT) + scan_ret = tfe_scan_ipv6_internal_addr(stream, result, ctx->scan_mid, hit_cnt, sapp_addr); + if (scan_ret > 0) { - hit_cnt += n_hit_result; + hit_cnt += scan_ret; } } + if(hit_cnt > 0) { ctx->hit_cnt = hit_cnt; diff --git a/plugin/business/tsg-http/src/tsg_logger.cpp b/plugin/business/tsg-http/src/tsg_logger.cpp index 8114e0f..a5e8b26 100644 --- a/plugin/business/tsg-http/src/tsg_logger.cpp +++ b/plugin/business/tsg-http/src/tsg_logger.cpp @@ -5,6 +5,7 @@ #include #include +#include "mpack.h" #include "tsg_proxy_logger.h" struct json_spec @@ -55,7 +56,57 @@ void get_http_body_uuid(char *uuid) return; } -struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger) +size_t file_bucket_upload_once(struct proxy_logger* handle, char *uuid, struct evbuffer *http_body) +{ + int kafka_status=0; + mpack_writer_t writer; + char *mpack_data=NULL, *data=NULL; + size_t mpack_size=0, datalen=0; + + mpack_writer_init_growable(&writer, &mpack_data, &mpack_size); + mpack_build_map(&writer); + + mpack_write_cstr(&writer, "uuid"); + mpack_write_cstr(&writer, uuid); + mpack_write_cstr(&writer, "fileType"); + mpack_write_cstr(&writer, "txt"); + mpack_write_cstr(&writer, "combineMode"); + mpack_write_cstr(&writer, "seek"); + mpack_write_cstr(&writer, "offset"); + mpack_write_u64(&writer, 0); + mpack_write_cstr(&writer, "lastChunkFlag"); + mpack_write_u32(&writer, 1); + datalen = evbuffer_get_length(http_body); + if(datalen > 0) + { + data = (char *)evbuffer_pullup(http_body, datalen); + mpack_write_cstr(&writer, "chunk"); + mpack_start_bin(&writer, datalen); + mpack_write_bytes(&writer, (const char *)data, datalen); + mpack_finish_bin(&writer); + } + mpack_write_cstr(&writer, "length"); + mpack_write_u64(&writer, datalen); + mpack_complete_map(&writer); // mpack_init_map + mpack_error_t errorno=mpack_writer_destroy(&writer); + if(errorno!=mpack_ok) + { + TFE_LOG_ERROR(handle->local_logger, "Mpack writer destroy is error(%s), uuid: %s", mpack_error_to_string(errorno), uuid); + } + kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_BUCKET, mpack_data, mpack_size); + if(kafka_status<0) + { + TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); + } + + free(mpack_data); + mpack_data = NULL; + mpack_size = 0; + + return datalen; +} + +struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger) { struct tango_cache_parameter *log_file_upload_para=NULL; struct proxy_logger* instance=ALLOC(struct proxy_logger,1); @@ -157,12 +208,8 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) common_obj=cJSON_CreateObject(); gettimeofday(&cur_time, NULL); - cJSON_AddNumberToObject(common_obj, "start_timestamp_ms", get_time_ms(http->start_time)); cJSON_AddNumberToObject(common_obj, "end_timestamp_ms", get_time_ms(cur_time)); - cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]); - cJSON_AddStringToObject(common_obj, "decoded_as", "HTTP"); - unsigned int category_id_val[64]={0}; char opt_val[24]={0}; uint16_t opt_out_size; @@ -236,6 +283,8 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) s2c_byte_num = log_msg->s2c_byte_num; } + cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]); + cJSON_AddStringToObject(common_obj, "decoded_as", "HTTP"); cJSON_AddNumberToObject(common_obj, "out_link_id", 0); cJSON_AddNumberToObject(common_obj, "in_link_id", 0); cJSON_AddStringToObject(common_obj, "sled_ip", handle->kafka_logger->local_ip_str); @@ -271,18 +320,12 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) #define FILE_CHUNK_UUID_LEN 40 char uuid[FILE_CHUNK_UUID_LEN]={0}; + size_t datalen=0; for(size_t i=0; iresult_num; i++) { - if(log_msg->result[i].do_log!=1) - { - continue; - } - - if(handle->en_hoslog!=1) - { - continue; - } + if(log_msg->result[i].do_log!=1) continue; + if(handle->en_hoslog!=1) continue; if(log_msg->req_body!=NULL) { @@ -293,7 +336,15 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) else { get_http_body_uuid(uuid); - cJSON_AddStringToObject(common_obj, "http_request_body", uuid); + datalen=file_bucket_upload_once(handle, uuid, log_msg->req_body); + if(datalen>0) + { + cJSON_AddStringToObject(common_obj, "http_request_body", uuid); + } + else + { + TFE_LOG_ERROR(handle->local_logger, "Upload req_body failed."); + } } } if(log_msg->resp_body!=NULL) @@ -305,7 +356,15 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) else { get_http_body_uuid(uuid); - cJSON_AddStringToObject(common_obj, "http_response_body", uuid); + datalen=file_bucket_upload_once(handle, uuid, log_msg->resp_body); + if(datalen>0) + { + cJSON_AddStringToObject(common_obj, "http_response_body", uuid); + } + else + { + TFE_LOG_ERROR(handle->local_logger, "Upload resp_body failed."); + } } } } @@ -361,7 +420,7 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload); - kafka_status = tfe_kafka_logger_send(handle->kafka_logger, log_payload, strlen(log_payload)); + kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_LOGGER, log_payload, strlen(log_payload)); free(log_payload); cJSON_Delete(per_hit_obj); if(kafka_status<0) diff --git a/resource/pangu/table_info.conf b/resource/pangu/table_info.conf index f6ed2a9..d755449 100644 --- a/resource/pangu/table_info.conf +++ b/resource/pangu/table_info.conf @@ -530,5 +530,17 @@ "table_name":"ATTR_SUBSCRIBER_ID", "table_type":"virtual", "physical_table": "TSG_OBJ_SUBSCRIBER_ID" + }, + { + "table_id":52, + "table_name":"ATTR_INTERNAL_ADDR", + "table_type":"virtual", + "physical_table": "TSG_OBJ_IP" + }, + { + "table_id":53, + "table_name":"ATTR_EXTERNAL_ADDR", + "table_type":"virtual", + "physical_table": "TSG_OBJ_IP" } ] \ No newline at end of file