diff --git a/ci/travis.sh b/ci/travis.sh index 958507c..faa41bf 100644 --- a/ci/travis.sh +++ b/ci/travis.sh @@ -34,7 +34,7 @@ env | sort # Install dependency from YUM yum install -y mrzcpd numactl-devel zlib-devel librdkafka-devel systemd-devel -yum install -y libcjson-devel libmaat4-devel libMESA_field_stat2-devel libMESA_handle_logger-devel libelua-devel +yum install -y libcjson-devel libmaat4-devel libMESA_field_stat2-devel libfieldstat3-devel libMESA_handle_logger-devel libelua-devel yum install -y libMESA_htable-devel libMESA_prof_load-devel libwiredcfg-devel libWiredLB-devel sapp-devel libbreakpad_mini-devel yum install -y libasan diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 0b3d516..4bb1b70 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -1,7 +1,7 @@ add_library( common src/tfe_utils.cpp src/tfe_types.cpp src/tfe_future.cpp src/tfe_http.cpp src/tfe_plugin.cpp src/tfe_rpc.cpp src/tfe_cmsg.cpp src/tfe_kafka_logger.cpp src/tfe_resource.cpp src/tfe_scan.cpp - src/tfe_pkt_util.cpp src/tfe_tcp_restore.cpp src/raw_socket.cpp src/packet_construct.cpp + src/tfe_pkt_util.cpp src/tfe_tcp_restore.cpp src/raw_socket.cpp src/packet_construct.cpp src/tfe_fieldstat.cpp src/tap.cpp src/io_uring.cpp src/intercept_policy.cpp) target_include_directories(common PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include) target_link_libraries(common PUBLIC libevent-static libevent-static-openssl libevent-static-pthreads rdkafka) diff --git a/common/include/tfe_fieldstat.h b/common/include/tfe_fieldstat.h new file mode 100644 index 0000000..5f1fb49 --- /dev/null +++ b/common/include/tfe_fieldstat.h @@ -0,0 +1,49 @@ +#ifndef _TFE_FIELDSTAT_METRIC_H +#define _TFE_FIELDSTAT_METRIC_H + +#ifdef __cpluscplus +extern "C" +{ +#endif + +#include +#include + +enum metric_columns_index +{ + COLUMN_HIT_COUNT = 0, + COLUMN_IN_BYTES, + COLUMN_OUT_BYTES, + COLUMN_IN_PKTS, + COLUMN_OUT_PKTS, + COLUMN_MAX +}; + +enum metric_tags_index +{ + TAG_RULE_ID = 0, + TAG_PINNING_STATUS, + TAG_ACTION, + TAG_SUB_ACTION, + TAG_MAX +}; + +struct tfe_fieldstat_metric_t +{ + int table_id; + int max_thread; + struct fieldstat_tag **tags; + unsigned int column_array[COLUMN_MAX]; + struct fieldstat_dynamic_instance *instance; +}; + +int tfe_fieldstat_metric_incrby(struct tfe_fieldstat_metric_t *fieldstat, unsigned int column_id, long long value, const struct fieldstat_tag tags[], int thread_id); +struct tfe_fieldstat_metric_t *tfe_fieldstat_metric_create(char *telegraf_ip, int telegraf_port, char *app_name, int cycle, int max_thread, void *local_logger); +void tfe_fieldstat_metric_destroy(struct tfe_fieldstat_metric_t *fieldstat); + +#ifdef __cpluscplus +} +#endif + +#endif + diff --git a/common/src/tfe_fieldstat.cpp b/common/src/tfe_fieldstat.cpp new file mode 100644 index 0000000..e46a2c0 --- /dev/null +++ b/common/src/tfe_fieldstat.cpp @@ -0,0 +1,69 @@ +#include +#include + +int tfe_fieldstat_metric_incrby(struct tfe_fieldstat_metric_t *fieldstat, unsigned int column_id, long long value, const struct fieldstat_tag tags[], int thread_id) +{ + return fieldstat_dynamic_table_metric_value_incrby(fieldstat->instance, fieldstat->table_id, column_id, "proxy_rule_hits", value, tags, (size_t)TAG_MAX, thread_id); +} + +struct tfe_fieldstat_metric_t *tfe_fieldstat_metric_create(char *telegraf_ip, int telegraf_port, char *app_name, int cycle, int max_thread, void *local_logger) +{ + int i=0; + + struct fieldstat_tag metric_tags[TAG_MAX] = {{"rule_id", 0, -1}, {"pinning_status", 0, -1 }, {"action", 0, -1}, {"sub_action", 2, -1} }; + const char *column_field[COLUMN_MAX] = {"hit_count", "in_bytes", "out_bytes", "in_pkts", "out_pkts"}; + enum field_type column_type[COLUMN_MAX] = {FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER, FIELD_TYPE_COUNTER}; + + struct tfe_fieldstat_metric_t *fieldstat = ALLOC(struct tfe_fieldstat_metric_t, 1); + + fieldstat->instance = fieldstat_dynamic_instance_new(app_name, max_thread); + if(!fieldstat->instance) + { + TFE_LOG_ERROR(local_logger, "fieldstat3 dynamic instance init failed."); + return NULL; + } + + fieldstat->max_thread=max_thread; + fieldstat_dynamic_set_line_protocol_server(fieldstat->instance, telegraf_ip, telegraf_port); + fieldstat_dynamic_set_output_interval(fieldstat->instance, cycle); + + fieldstat->table_id = fieldstat_register_dynamic_table(fieldstat->instance, "proxy_rule_hits", column_field, column_type, (size_t)COLUMN_MAX, fieldstat->column_array); + if(fieldstat->table_id < 0) + { + TFE_LOG_ERROR(local_logger, "fieldstat3 register dynamic table failed."); + FREE(&fieldstat); + return NULL; + } + + fieldstat->tags = ALLOC(struct fieldstat_tag*, max_thread); + for (i = 0; i < max_thread; i++) + { + fieldstat->tags[i] = ALLOC(struct fieldstat_tag, TAG_MAX); + memcpy(fieldstat->tags[i], metric_tags, sizeof(struct fieldstat_tag) * (size_t)TAG_MAX); + } + + fieldstat_dynamic_instance_start(fieldstat->instance); + return fieldstat; +} + +void tfe_fieldstat_metric_destroy(struct tfe_fieldstat_metric_t *fieldstat) +{ + if(fieldstat) + { + if(fieldstat->instance) + { + fieldstat_dynamic_instance_free(fieldstat->instance); + } + + for (int i = 0; i < fieldstat->max_thread; i++) + { + if (fieldstat->tags[i]) + { + FREE(&fieldstat->tags[i]); + } + } + FREE(&fieldstat->tags); + FREE(&fieldstat); + } +} + diff --git a/common/src/tfe_resource.cpp b/common/src/tfe_resource.cpp index 356827e..cf1f067 100644 --- a/common/src/tfe_resource.cpp +++ b/common/src/tfe_resource.cpp @@ -116,6 +116,7 @@ static struct maat *create_maat_feather(const char *instance_name, const char *p if (maat_stat_on) { maat_options_set_stat_on(opts); + maat_options_set_stat_file(opts, maat_stat_file); if (maat_perf_on) { maat_options_set_perf_on(opts); diff --git a/conf/doh/doh.conf b/conf/doh/doh.conf index 97b2173..2e34a1f 100644 --- a/conf/doh/doh.conf +++ b/conf/doh/doh.conf @@ -1,6 +1,12 @@ [doh] enable=1 +[proxy_hits] +cycle=1000 +telegraf_port=8400 +telegraf_ip=127.0.0.1 +app_name="proxy_rule_hits" + [maat] table_appid=TSG_OBJ_APP_ID table_addr=TSG_SECURITY_ADDR diff --git a/conf/pangu/pangu_pxy.conf b/conf/pangu/pangu_pxy.conf index 414ddf6..0ddafa5 100644 --- a/conf/pangu/pangu_pxy.conf +++ b/conf/pangu/pangu_pxy.conf @@ -1,6 +1,12 @@ [debug] enable_plugin=1 +[proxy_hits] +cycle=1000 +telegraf_port=8400 +telegraf_ip=127.0.0.1 +app_name="proxy_rule_hits" + [log] entrance_id=0 # default 1, if enable "en_sendlog", the iterm "tfe.conf [kafka] enable" must set 1 diff --git a/platform/CMakeLists.txt b/platform/CMakeLists.txt index d0c2b17..083def5 100644 --- a/platform/CMakeLists.txt +++ b/platform/CMakeLists.txt @@ -25,6 +25,7 @@ target_link_libraries(tfe pthread dl nfnetlink hiredis-static MESA_htable wiredcfg MESA_field_stat + fieldstat3 breakpad_mini ${SYSTEMD_LIBRARIES}) diff --git a/plugin/business/doh/src/doh.cpp b/plugin/business/doh/src/doh.cpp index 5d0e79e..c0ba0a8 100644 --- a/plugin/business/doh/src/doh.cpp +++ b/plugin/business/doh/src/doh.cpp @@ -1,6 +1,7 @@ #include "logger.h" #include #include +#include extern void increase_redirect_policy_hit_num(void); @@ -661,6 +662,21 @@ static void doh_process_req(const struct tfe_stream *stream, const struct tfe_ht } } +struct tfe_fieldstat_metric_t *doh_fieldstat_init(const char* profile, const char *section, int max_thread) +{ + int cycle=0; + unsigned short telegraf_port=0; + char telegraf_ip[TFE_STRING_MAX]={0}; + char app_name[TFE_STRING_MAX]={0}; + + MESA_load_profile_short_nodef(profile, section, "telegraf_port", (short *)&(telegraf_port)); + MESA_load_profile_string_nodef(profile, section, "telegraf_ip", telegraf_ip, sizeof(telegraf_ip)); + MESA_load_profile_string_def(profile, section, "app_name", app_name, sizeof(app_name), "metric"); + MESA_load_profile_int_def(profile, section, "cycle", &cycle, 1000); + + return tfe_fieldstat_metric_create(telegraf_ip, telegraf_port, app_name, cycle, max_thread, g_doh_conf->local_logger); +} + int doh_on_init(struct tfe_proxy *proxy) { const char *profile = "./conf/doh/doh.conf"; @@ -679,6 +695,7 @@ int doh_on_init(struct tfe_proxy *proxy) g_doh_conf->thread_num = tfe_proxy_get_work_thread_count(); g_doh_conf->local_logger = MESA_create_runtime_log_handle("doh", RLOG_LV_DEBUG); + g_doh_conf->fieldstat = doh_fieldstat_init(profile, "proxy_hits", g_doh_conf->thread_num); g_doh_conf->gc_evbase = tfe_proxy_get_gc_evbase(); g_doh_conf->fs_handle = tfe_proxy_get_fs_handle(); @@ -856,6 +873,23 @@ int doh_on_data(const struct tfe_stream *stream, const struct tfe_http_session * return NO_CALL_NEXT_PLUGIN; } +void doh_send_metric_log(const struct tfe_stream * stream, struct doh_ctx *ctx, unsigned int thread_id) +{ + size_t c2s_byte_num = 0, s2c_byte_num =0; + struct tfe_fieldstat_metric_t *fieldstat = g_doh_conf->fieldstat; + + fieldstat->tags[thread_id][TAG_RULE_ID].value_int = ctx->result->config_id; + fieldstat->tags[thread_id][TAG_ACTION].value_int = 48; + fieldstat->tags[thread_id][TAG_SUB_ACTION].value_str = "redirect"; + + tfe_stream_info_get(stream, INFO_FROM_DOWNSTREAM_RX_OFFSET, &c2s_byte_num, sizeof(c2s_byte_num)); + tfe_stream_info_get(stream, INFO_FROM_UPSTREAM_RX_OFFSET, &s2c_byte_num, sizeof(s2c_byte_num)); + tfe_fieldstat_metric_incrby(fieldstat, fieldstat->column_array[COLUMN_HIT_COUNT], 1, fieldstat->tags[thread_id], thread_id); + tfe_fieldstat_metric_incrby(fieldstat, fieldstat->column_array[COLUMN_IN_BYTES], c2s_byte_num, fieldstat->tags[thread_id], thread_id); + tfe_fieldstat_metric_incrby(fieldstat, fieldstat->column_array[COLUMN_OUT_BYTES], s2c_byte_num, fieldstat->tags[thread_id], thread_id); + return; +} + void doh_on_end(const struct tfe_stream *stream, const struct tfe_http_session *session, unsigned int thread_id, void **pme) { if (!g_doh_conf->enable) @@ -869,6 +903,7 @@ void doh_on_end(const struct tfe_stream *stream, const struct tfe_http_session * int ret = doh_send_log(g_doh_conf, session, stream, ctx); if (ret > 0) { + doh_send_metric_log(stream, ctx, thread_id); ATOMIC_ADD(&(g_doh_conf->stat_val[STAT_LOG_NUM]), ret); } } diff --git a/plugin/business/doh/src/pub.h b/plugin/business/doh/src/pub.h index 6606312..dd74761 100644 --- a/plugin/business/doh/src/pub.h +++ b/plugin/business/doh/src/pub.h @@ -69,6 +69,7 @@ struct doh_conf struct event *gcev; struct event_base *gc_evbase; screen_stat_handle_t fs_handle; + struct tfe_fieldstat_metric_t *fieldstat; struct maat *maat; struct maat_table tables[TYPE_MAX]; diff --git a/plugin/business/traffic-mirror/src/entry.cpp b/plugin/business/traffic-mirror/src/entry.cpp index de81065..abe7149 100644 --- a/plugin/business/traffic-mirror/src/entry.cpp +++ b/plugin/business/traffic-mirror/src/entry.cpp @@ -367,6 +367,7 @@ static struct maat* maat_feather_create_with_override(const char * instance_name if (maat_stat_on) { maat_options_set_stat_on(opts); + maat_options_set_stat_file(opts, maat_stat_file); if (maat_perf_on) { maat_options_set_perf_on(opts); diff --git a/plugin/business/tsg-http/src/tsg_http.cpp b/plugin/business/tsg-http/src/tsg_http.cpp index dac1f35..fd788bd 100644 --- a/plugin/business/tsg-http/src/tsg_http.cpp +++ b/plugin/business/tsg-http/src/tsg_http.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -158,8 +159,6 @@ struct tsg_proxy_rt int scan_table_id[__SCAN_TABLE_MAX]; int plolicy_table_id[POLICY_PROFILE_TABLE_MAX]; ctemplate::Template * tpl_403, * tpl_404, * tpl_451; - char * reject_page; - int page_size; long long suspend_max; int cache_enabled; @@ -171,6 +170,7 @@ struct tsg_proxy_rt struct event_base* gc_evbase; struct event* gcev; + struct tfe_fieldstat_metric_t *fieldstat; struct tsg_lua_script lua_script; Ratelimiter_handle_t ratelimiter; int enable_rate; @@ -192,6 +192,21 @@ static void proxy_http_gc_cb(evutil_socket_t fd, short what, void * arg) return; } +struct tfe_fieldstat_metric_t *proxy_fieldstat_init(const char* profile_path, const char *section, int max_thread) +{ + int cycle=0; + unsigned short telegraf_port=0; + char telegraf_ip[TFE_STRING_MAX]={0}; + char app_name[TFE_STRING_MAX]={0}; + + MESA_load_profile_short_nodef(profile_path, section, "telegraf_port", (short *)&(telegraf_port)); + MESA_load_profile_string_nodef(profile_path, section, "telegraf_ip", telegraf_ip, sizeof(telegraf_ip)); + MESA_load_profile_string_def(profile_path, section, "app_name", app_name, sizeof(app_name), "metric"); + MESA_load_profile_int_def(profile_path, section, "cycle", &cycle, 1000); + + return tfe_fieldstat_metric_create(telegraf_ip, telegraf_port, app_name, cycle, max_thread, g_proxy_rt->local_logger); +} + static void proxy_http_stat_init(struct tsg_proxy_rt * pangu_runtime) { int i=0; @@ -1169,12 +1184,12 @@ int proxy_http_init(struct tfe_proxy * proxy) { goto error_out; } - g_proxy_rt->fs_handle = tfe_proxy_get_fs_handle(); - g_proxy_rt->ratelimiter=ratelimit_handle_create(profile_path, "ratelimit"); - proxy_http_stat_init(g_proxy_rt); + g_proxy_rt->ratelimiter=ratelimit_handle_create(profile_path, "ratelimit"); + g_proxy_rt->fieldstat=proxy_fieldstat_init(profile_path, "proxy_hits", g_proxy_rt->thread_num); + if(http_lua_handle_create(&g_proxy_rt->lua_script, g_proxy_rt->thread_num, "tfe") <0) { goto error_out; @@ -3261,6 +3276,39 @@ static inline int ctx_actually_manipulate(struct proxy_http_ctx * ctx) } } +void proxy_send_metric_log(const struct tfe_stream * stream, struct proxy_http_ctx * ctx, unsigned int thread_id) +{ + size_t i=0; + const char *proxy_action_map[__PX_ACTION_MAX]; + proxy_action_map[PX_ACTION_MONIT]="monitor"; + proxy_action_map[PX_ACTION_REJECT]="deny"; + proxy_action_map[PX_ACTION_WHITELIST]="allow"; + const char *manipulate_action_map[]= {"redirect","block","replace","hijack","insert","edit_element","run_script"}; + struct tfe_fieldstat_metric_t *fieldstat = g_proxy_rt->fieldstat; + + for(i=0; i< ctx->n_enforce; i++) + { + fieldstat->tags[thread_id][TAG_RULE_ID].value_int = ctx->enforce_rules[i].config_id; + fieldstat->tags[thread_id][TAG_ACTION].value_int = ctx->enforce_rules[i].action; + if(ctx->enforce_rules[i].action == PX_ACTION_MANIPULATE) + { + fieldstat->tags[thread_id][TAG_SUB_ACTION].value_str = manipulate_action_map[ctx->param->action]; + } + else + { + fieldstat->tags[thread_id][TAG_SUB_ACTION].value_str = proxy_action_map[ctx->enforce_rules[i].action]; + } + + size_t c2s_byte_num = 0, s2c_byte_num =0; + tfe_stream_info_get(stream, INFO_FROM_DOWNSTREAM_RX_OFFSET, &c2s_byte_num, sizeof(c2s_byte_num)); + tfe_stream_info_get(stream, INFO_FROM_UPSTREAM_RX_OFFSET, &s2c_byte_num, sizeof(s2c_byte_num)); + tfe_fieldstat_metric_incrby(fieldstat, fieldstat->column_array[COLUMN_HIT_COUNT], 1, fieldstat->tags[thread_id], thread_id); + tfe_fieldstat_metric_incrby(fieldstat, fieldstat->column_array[COLUMN_IN_BYTES], c2s_byte_num, fieldstat->tags[thread_id], thread_id); + tfe_fieldstat_metric_incrby(fieldstat, fieldstat->column_array[COLUMN_OUT_BYTES], s2c_byte_num, fieldstat->tags[thread_id], thread_id); + } + return; +} + void proxy_on_http_end(const struct tfe_stream * stream, const struct tfe_http_session * session, unsigned int thread_id, void ** pme) { @@ -3317,6 +3365,7 @@ void proxy_on_http_end(const struct tfe_stream * stream, ATOMIC_INC(&(g_proxy_rt->stat_val[STAT_ACTION_MONIT])); } } + proxy_send_metric_log(stream, ctx, thread_id); } if(ctx->rep_ctx && ctx->rep_ctx->actually_replaced==1) diff --git a/vendor/CMakeLists.txt b/vendor/CMakeLists.txt index b35d926..968182e 100644 --- a/vendor/CMakeLists.txt +++ b/vendor/CMakeLists.txt @@ -169,6 +169,10 @@ add_library(MESA_field_stat SHARED IMPORTED GLOBAL) set_property(TARGET MESA_field_stat PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libMESA_field_stat2.so) set_property(TARGET MESA_field_stat PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR}) +add_library(fieldstat3 SHARED IMPORTED GLOBAL) +set_property(TARGET fieldstat3 PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/libfieldstat3.so) +set_property(TARGET fieldstat3 PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR}) + add_library(rdkafka SHARED IMPORTED GLOBAL) set_property(TARGET rdkafka PROPERTY IMPORTED_LOCATION ${MESA_FRAMEWORK_LIB_DIR}/librdkafka.so) set_property(TARGET rdkafka PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${MESA_FRAMEWORK_INCLUDE_DIR}/MESA)