diff --git a/common/include/tfe_cmsg.h b/common/include/tfe_cmsg.h index 9b32a44..543d0ba 100644 --- a/common/include/tfe_cmsg.h +++ b/common/include/tfe_cmsg.h @@ -115,9 +115,17 @@ enum tfe_cmsg_tlv_type TFE_CMSG_TLV_NR_MAX }; +#define TFE_CMSG_FLAG_INIT 0x0 +#define TFE_CMSG_FLAG_USER0 0x1 // 1 << 0 +#define TFE_CMSG_FLAG_USER1 0x2 // 1 << 1 + struct tfe_cmsg* tfe_cmsg_init(); void tfe_cmsg_destroy(struct tfe_cmsg *cmsg); +void tfe_cmsg_dup(struct tfe_cmsg *cmsg); +void tfe_cmsg_set_flag(struct tfe_cmsg *cmsg, uint8_t flag); +uint8_t tfe_cmsg_get_flag(struct tfe_cmsg *cmsg); + int tfe_cmsg_get_value(struct tfe_cmsg * cmsg, enum tfe_cmsg_tlv_type type, unsigned char * out_value, size_t sz_out_value_buf, uint16_t * out_size); int tfe_cmsg_set(struct tfe_cmsg * cmsg, enum tfe_cmsg_tlv_type type, const unsigned char * value, uint16_t size); diff --git a/common/src/tfe_cmsg.cpp b/common/src/tfe_cmsg.cpp index 8b01030..d16705b 100644 --- a/common/src/tfe_cmsg.cpp +++ b/common/src/tfe_cmsg.cpp @@ -24,6 +24,8 @@ struct tfe_cmsg_tlv struct tfe_cmsg { + uint8_t flag; + uint8_t ref; pthread_rwlock_t rwlock; uint16_t nr_tlvs; struct tfe_cmsg_tlv* tlvs[TFE_CMSG_TLV_NR_MAX]; @@ -43,6 +45,10 @@ struct tfe_cmsg* tfe_cmsg_init() cmsg->size = sizeof(struct tfe_cmsg_serialize_header); pthread_rwlock_init(&(cmsg->rwlock), NULL); + + ATOMIC_ZERO(&cmsg->flag); + ATOMIC_ZERO(&cmsg->ref); + ATOMIC_INC(&cmsg->ref); return cmsg; } @@ -50,6 +56,8 @@ void tfe_cmsg_destroy(struct tfe_cmsg *cmsg) { if(cmsg != NULL) { + if ((__sync_sub_and_fetch(&cmsg->ref, 1) != 0)) + return; pthread_rwlock_wrlock(&cmsg->rwlock); for(int i = 0; i < TFE_CMSG_TLV_NR_MAX; i++) { @@ -57,9 +65,31 @@ void tfe_cmsg_destroy(struct tfe_cmsg *cmsg) } pthread_rwlock_unlock(&cmsg->rwlock); pthread_rwlock_destroy(&cmsg->rwlock); + FREE(&cmsg); } +} - FREE(&cmsg); +void tfe_cmsg_dup(struct tfe_cmsg *cmsg) +{ + if (cmsg == NULL) + return; + ATOMIC_INC(&cmsg->ref); +} + +void tfe_cmsg_set_flag(struct tfe_cmsg *cmsg, uint8_t flag) +{ + if (cmsg == NULL) + return; + ATOMIC_SET(&cmsg->flag, flag); +} + +uint8_t tfe_cmsg_get_flag(struct tfe_cmsg *cmsg) +{ + if (cmsg == NULL) + return 0; + uint8_t flag = 0; + flag = ATOMIC_READ(&cmsg->flag); + return flag; } int tfe_cmsg_set(struct tfe_cmsg * cmsg, enum tfe_cmsg_tlv_type type, const unsigned char * value, uint16_t size) @@ -230,6 +260,7 @@ int tfe_cmsg_deserialize(const unsigned char *data, uint16_t len, struct tfe_cms } cmsg = ALLOC(struct tfe_cmsg, 1); + pthread_rwlock_init(&(cmsg->rwlock), NULL); offset = sizeof(struct tfe_cmsg_serialize_header); nr_tlvs = ntohs(header->nr_tlvs); for(int i = 0; i < nr_tlvs; i++) @@ -258,9 +289,7 @@ int tfe_cmsg_deserialize(const unsigned char *data, uint16_t len, struct tfe_cms offset += length; } cmsg->size = offset; - pthread_rwlock_wrlock(&((*pcmsg)->rwlock)); *pcmsg = cmsg; - pthread_rwlock_unlock(&((*pcmsg)->rwlock)); return 0; error_out: diff --git a/common/src/tfe_ctrl_packet.cpp b/common/src/tfe_ctrl_packet.cpp index 788ff47..4ee7536 100644 --- a/common/src/tfe_ctrl_packet.cpp +++ b/common/src/tfe_ctrl_packet.cpp @@ -28,6 +28,7 @@ void ctrl_packet_parser_init(struct ctrl_pkt_parser *handler) { memset(handler, 0, sizeof(struct ctrl_pkt_parser)); handler->cmsg = tfe_cmsg_init(); + tfe_cmsg_dup(handler->cmsg); } // return 0 : success diff --git a/common/src/tfe_mpack.cpp b/common/src/tfe_mpack.cpp index 06c3028..e7f2893 100644 --- a/common/src/tfe_mpack.cpp +++ b/common/src/tfe_mpack.cpp @@ -245,6 +245,7 @@ int parse_messagepack(const char* data, size_t length, void *ctx) if (strncasecmp(buff, "opening", sizeof(buff)) == 0) { handler->state = SESSION_STATE_OPENING; + goto succ; } else if (strncasecmp(buff, "active", sizeof(buff)) == 0) { @@ -253,10 +254,12 @@ int parse_messagepack(const char* data, size_t length, void *ctx) else if (strncasecmp(buff, "closing", sizeof(buff)) == 0) { handler->state = SESSION_STATE_CLOSING; + goto succ; } else if (strncasecmp(buff, "resetall", sizeof(buff)) == 0) { handler->state = SESSION_STATE_RESETALL; + goto succ; } else { @@ -303,8 +306,11 @@ int parse_messagepack(const char* data, size_t length, void *ctx) TFE_LOG_ERROR(g_default_logger, "%s: unexpected control packet: (proxy no found)", LOG_TAG_CTRLPKT); goto error; } + proxy_map = mpack_node_map_cstr(params, "proxy"); proxy_parse_messagepack(proxy_map, handler); + +succ: mpack_tree_destroy(&tree); return 0; error: diff --git a/common/src/tfe_packet_io.cpp b/common/src/tfe_packet_io.cpp index 699d025..40140ac 100644 --- a/common/src/tfe_packet_io.cpp +++ b/common/src/tfe_packet_io.cpp @@ -956,7 +956,25 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx; struct packet_io *packet_io = thread->ref_io; - char *data; + marsio_buff_t *tx_buffs[1]; + struct metadata meta = {0}; + + uint16_t length = 0; + uint8_t ssl_intercept_status = 0; + uint64_t ssl_upstream_latency = 0; + uint64_t ssl_downstream_latency = 0; + char ssl_upstream_version[64] = {0}; + uint16_t ssl_upstream_version_length = 0; + char ssl_downstream_version[64] = {0}; + uint16_t ssl_downstream_version_length = 0; + uint8_t ssl_cert_verify = 0; + char ssl_error[64] = {0}; + uint16_t ssl_error_length = 0; + char ssl_passthrough_reason[32] = {0}; + uint16_t ssl_passthrough_reason_length = 0; + uint8_t ssl_pinning_state = 0; + + char *data = NULL; size_t size; mpack_writer_t writer; mpack_writer_init_growable(&writer, &data, &size); @@ -982,85 +1000,26 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) mpack_build_map(&writer); mpack_write_cstr(&writer, "ssl_intercept_info"); - mpack_build_array(&writer); // cmsg value begin + mpack_build_array(&writer); - int ret = 0; - uint8_t ssl_intercept_status = 0; - uint16_t length = 0; - ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, (unsigned char *)&ssl_intercept_status, sizeof(ssl_intercept_status), &length); - if (ret < 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl intercept state from cmsg: %s", strerror(-ret)); - return; - } - uint64_t ssl_upstream_latency = 0; - ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_SERVER_SIDE_LATENCY, (unsigned char *)&ssl_upstream_latency, sizeof(ssl_upstream_latency), &length); - if (ret < 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl upstream latency from cmsg: %s", strerror(-ret)); - return; - } + tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_INTERCEPT_STATE, (unsigned char *)&ssl_intercept_status, sizeof(ssl_intercept_status), &length); - uint64_t ssl_downstream_latency = 0; - ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CLIENT_SIDE_LATENCY, (unsigned char *)&ssl_downstream_latency, sizeof(ssl_downstream_latency), &length); - if (ret < 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl downstream latency from cmsg: %s", strerror(-ret)); - return; - } + tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_SERVER_SIDE_LATENCY, (unsigned char *)&ssl_upstream_latency, sizeof(ssl_upstream_latency), &length); - char ssl_upstream_version[64] = {0}; - uint16_t ssl_upstream_version_length = 0; - ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_SERVER_SIDE_VERSION, (unsigned char *)ssl_upstream_version, sizeof(ssl_upstream_version), &length); - if (ret < 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl upstream version from cmsg: %s", strerror(-ret)); - return; - } + tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CLIENT_SIDE_LATENCY, (unsigned char *)&ssl_downstream_latency, sizeof(ssl_downstream_latency), &length); - char ssl_downstream_version[64] = {0}; - uint16_t ssl_downstream_version_length = 0; - ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CLIENT_SIDE_VERSION, (unsigned char *)ssl_downstream_version, sizeof(ssl_downstream_version), &ssl_downstream_version_length); - if (ret < 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl downstream version from cmsg: %s", strerror(-ret)); - return; - } + tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_SERVER_SIDE_VERSION, (unsigned char *)ssl_upstream_version, sizeof(ssl_upstream_version), &length); - uint8_t ssl_pinning_state = 0; - ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&ssl_pinning_state, sizeof(ssl_pinning_state), &length); - if (ret < 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl pinning state from cmsg: %s", strerror(-ret)); - return; - } + tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CLIENT_SIDE_VERSION, (unsigned char *)ssl_downstream_version, sizeof(ssl_downstream_version), &ssl_downstream_version_length); - uint8_t ssl_cert_verify = 0; - ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CERT_VERIFY, (unsigned char *)&ssl_cert_verify, sizeof(ssl_cert_verify), &length); - if (ret < 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl cert verify from cmsg: %s", strerror(-ret)); - return; - } + tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_PINNING_STATE, (unsigned char *)&ssl_pinning_state, sizeof(ssl_pinning_state), &length); - char ssl_error[64] = {0}; - uint16_t ssl_error_length = 0; - ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_ERROR, (unsigned char *)ssl_error, sizeof(ssl_error), &ssl_error_length); - if (ret < 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl error from cmsg: %s", strerror(-ret)); - return; - } + tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_CERT_VERIFY, (unsigned char *)&ssl_cert_verify, sizeof(ssl_cert_verify), &length); - char ssl_passthrough_reason[32] = {0}; - uint16_t ssl_passthrough_reason_length = 0; - ret = tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_PASSTHROUGH_REASON, (unsigned char *)ssl_passthrough_reason, sizeof(ssl_passthrough_reason), &ssl_passthrough_reason_length); - if (ret < 0) - { - TFE_LOG_ERROR(g_default_logger, "failed at fetch ssl passthrough reason from cmsg: %s", strerror(-ret)); - return; - } + tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_ERROR, (unsigned char *)ssl_error, sizeof(ssl_error), &ssl_error_length); + + tfe_cmsg_get_value(s_ctx->cmsg, TFE_CMSG_SSL_PASSTHROUGH_REASON, (unsigned char *)ssl_passthrough_reason, sizeof(ssl_passthrough_reason), &ssl_passthrough_reason_length); mpack_write_u8(&writer, ssl_intercept_status); mpack_write_u64(&writer, ssl_upstream_latency); @@ -1073,33 +1032,29 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx) mpack_write_str(&writer, ssl_passthrough_reason, ssl_passthrough_reason_length); mpack_complete_array(&writer); mpack_complete_map(&writer); - mpack_complete_map(&writer); - mpack_complete_map(&writer); + + char *raw_packet_header_data = s_ctx->ctrl_meta->raw_data; + int raw_packet_header_len = s_ctx->ctrl_meta->l7offset; + marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread_seq); + char *dst = marsio_buff_append(tx_buffs[0], raw_packet_header_len + size); + memcpy(dst, raw_packet_header_data, raw_packet_header_len); + memcpy(dst + raw_packet_header_len, data, size); - // marsio_buff_t *tx_buffs[1]; - // char *raw_packet_header_data = session_ctx->ctrl_meta->raw_data; - // int raw_packet_header_len = session_ctx->ctrl_meta->l7offset; - // marsio_buff_malloc_global(packet_io->instance, tx_buffs, 1, 0, thread_index); - // char *dst = marsio_buff_append(tx_buffs[0], raw_packet_header_len + size); - // memcpy(dst, raw_packet_header_data, raw_packet_header_len); - // memcpy(dst + raw_packet_header_len, data, size); + meta.session_id = s_ctx->session_id; + meta.l7offset = raw_packet_header_len; + meta.is_ctrl_pkt = 1; + meta.sids.num = 1; + meta.sids.elems[0] = acceptor_ctx->firewall_sids; + route_ctx_copy(&meta.route_ctx, &s_ctx->ctrl_meta->route_ctx); + packet_io_set_metadata(tx_buffs[0], &meta); + int nsend = marsio_buff_datalen(tx_buffs[0]); + marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_seq, tx_buffs, 1); - // struct metadata meta = {0}; - // meta.session_id = session_ctx->session_id; - // meta.l7offset = raw_packet_header_len; - // meta.is_ctrl_pkt = 1; - // meta.sids.num = 1; - // meta.sids.elems[0] = sce_ctx->firewall_sids; - // route_ctx_copy(&meta.route_ctx, &session_ctx->ctrl_meta->route_ctx); - // mbuff_set_metadata(tx_buffs[0], &meta); - // int nsend = marsio_buff_datalen(tx_buffs[0]); - // marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_index, tx_buffs, 1); - -end: mpack_writer_destroy(&writer); - free(data); + if (data) + free(data); return; } @@ -1115,6 +1070,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser uint16_t size = 0; char *addr_str = NULL; + unsigned int stream_common_direction; uint8_t stream_protocol_in_char = 0; uint8_t enalbe_decrypted_traffic_steering = 0; struct ethhdr *ether_hdr = NULL; @@ -1239,6 +1195,14 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser goto end; } + // E -> I + if (meta->is_e2i_dir) + stream_common_direction = 'I'; + // I -> E + else + stream_common_direction = 'E'; + tfe_cmsg_set(parser->cmsg, TFE_CMSG_COMMON_DIRECTION, (const unsigned char *)&stream_common_direction, sizeof(stream_common_direction)); + s_ctx = session_ctx_new(); s_ctx->raw_meta_i2e = metadata_new(); s_ctx->raw_meta_e2i = metadata_new(); @@ -1286,6 +1250,8 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser return 0; end: + if (parser->cmsg) + free(parser->cmsg); return -1; } @@ -1315,9 +1281,6 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser { struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; TFE_LOG_INFO(g_default_logger, "%s: session %lu closing", LOG_TAG_PKTIO, s_ctx->session_id); - - send_event_log(s_ctx, thread_seq, ctx); - session_table_delete_by_id(thread->session_table, meta->session_id); return 0; } @@ -1429,23 +1392,6 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; - if (meta.is_e2i_dir) - { - if (metadata_is_empty(s_ctx->raw_meta_e2i)) - { - metadata_deep_copy(s_ctx->raw_meta_e2i, &meta); - } - s_ctx->raw_meta_e2i->sids = meta.sids; - } - else - { - if (metadata_is_empty(s_ctx->raw_meta_i2e)) - { - metadata_deep_copy(s_ctx->raw_meta_i2e, &meta); - } - s_ctx->raw_meta_i2e->sids = meta.sids; - } - if (meta.is_decrypted) { // c2s @@ -1473,11 +1419,28 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx } else { + if (meta.is_e2i_dir) + { + if (metadata_is_empty(s_ctx->raw_meta_e2i)) + { + metadata_deep_copy(s_ctx->raw_meta_e2i, &meta); + } + s_ctx->raw_meta_e2i->sids = meta.sids; + } + else + { + if (metadata_is_empty(s_ctx->raw_meta_i2e)) + { + metadata_deep_copy(s_ctx->raw_meta_i2e, &meta); + } + s_ctx->raw_meta_i2e->sids = meta.sids; + } #if 0 struct raw_pkt_parser raw_parser; raw_packet_parser_init(&raw_parser, meta->session_id, LAYER_TYPE_ALL, 8); const void *payload = raw_packet_parser_parse(&raw_parser, (const void *)meta->raw_data, meta->raw_len); buff_size = raw_len - ((char *)payload - meta->raw_data) + sizeof(struct ethhdr) + sizeof(struct ip) + sizeof(struct tcphdr); + #endif // send to tap0 add_ether_header(raw_data, packet_io->config.src_mac, packet_io->config.tap_mac); @@ -1488,6 +1451,12 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx tfe_tap_write_per_thread(thread->tap_ctx->tap_fd, raw_data, raw_len, g_default_logger); } throughput_metrics_inc(&g_metrics->tap_pkt_tx, 1, raw_len); + + uint8_t flag = tfe_cmsg_get_flag(s_ctx->cmsg); + if (flag & TFE_CMSG_FLAG_USER0) { + send_event_log(s_ctx, thread_seq, ctx); + tfe_cmsg_set_flag(s_ctx->cmsg, TFE_CMSG_FLAG_INIT); + } } marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq); time_echo(meta.session_id, "raw pkg from nf end"); diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 8e467bf..0f159d4 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -538,7 +538,7 @@ void tfe_proxy_acceptor_init(struct tfe_proxy * proxy, const char * profile) MESA_load_profile_uint_def(profile, "system", "enable_kni_v1", &proxy->en_kni_v1_acceptor, 0); MESA_load_profile_uint_def(profile, "system", "enable_kni_v2", &proxy->en_kni_v2_acceptor, 0); MESA_load_profile_uint_def(profile, "system", "enable_kni_v3", &proxy->en_kni_v3_acceptor, 0); - MESA_load_profile_uint_def(profile, "system", "enable_kni_v4", &proxy->en_kni_v4_acceptor, 1); + MESA_load_profile_uint_def(profile, "system", "enable_kni_v4", &proxy->en_kni_v4_acceptor, 0); int ret = proxy->en_kni_v1_acceptor + proxy->en_kni_v2_acceptor + proxy->en_kni_v3_acceptor + proxy->en_kni_v4_acceptor; CHECK_OR_EXIT((ret == 1), "Invalid KNI acceptor. Exit."); diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index 559aae3..ab6494d 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -493,6 +493,7 @@ static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg TFE_PROXY_STAT_INCREASE(STAT_STREAM_BYPASS, 1); _stream->is_first_call_rxcb = 1; tfe_set_intercept_metric(&_stream->head, 1, 0, 0, 0, 0); + tfe_cmsg_set_flag(_stream->cmsg, TFE_CMSG_FLAG_USER0); } int inbuff_len = evbuffer_get_length(__input_buffer); @@ -1276,6 +1277,8 @@ void ssl_downstream_create_on_success(future_result_t * result, void * user) TFE_PROXY_STAT_INCREASE(STAT_STEERING_SSL_CONN, 1); } + tfe_cmsg_set_flag(_stream->cmsg, TFE_CMSG_FLAG_USER0); + return; }