TSG-14930 TFE支持发送控制报文给SAPP

This commit is contained in:
wangmenglan
2023-05-09 22:12:38 +08:00
parent fecc023418
commit 542f4cbdfa
12 changed files with 296 additions and 334 deletions

View File

@@ -20,7 +20,7 @@
#include "tfe_ctrl_packet.h"
#include "tfe_raw_packet.h"
#include "io_uring.h"
#include "tfe_metrics.h"
#include "tfe_packet_io_fs.h"
#include "tfe_cmsg.h"
#include "tfe_tcp_restore.h"
#include "tfe_stream.h"
@@ -162,14 +162,6 @@ extern void chaining_policy_enforce(struct chaining_policy_enforcer *enforcer, s
/******************************************************************************
* STATIC
******************************************************************************/
static void time_echo(uint64_t session_id, char *info)
{
time_t t;
time(&t);
TFE_LOG_ERROR(g_default_logger, "%s: session:%lu, time:%s %s", LOG_TAG_PKTIO, session_id, ctime(&t), info);
}
// return 0 : not keepalive packet
// return 1 : is keepalive packet
static int is_downstream_keepalive_packet(marsio_buff_t *rx_buff)
@@ -595,8 +587,8 @@ static int tcp_restore_set_from_cmsg(struct tfe_cmsg *cmsg, struct tcp_restore_i
ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_TCP_RESTORE_WSACLE_SERVER, (unsigned char *)&wsacle_server, sizeof(uint8_t), &length);
if (ret == 0)
{
restore_info->client.wscale_perm = true;
restore_info->client.wscale = wsacle_server;
restore_info->server.wscale_perm = true;
restore_info->server.wscale = wsacle_server;
}
uint8_t sack_client;
@@ -889,6 +881,7 @@ static int packet_io_set_metadata(marsio_buff_t *tx_buff, struct metadata *meta)
if (meta->is_ctrl_pkt)
{
marsio_buff_set_ctrlbuf(tx_buff);
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_PAYLOAD_OFFSET, &(meta->l7offset), sizeof(meta->l7offset)) != 0)
{
TFE_LOG_ERROR(g_default_logger, "%s: unable to set l7offset for metadata", LOG_TAG_PKTIO);
@@ -952,7 +945,7 @@ static void packet_io_dump_metadata(marsio_buff_t *tx_buff, struct metadata *met
*/
static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx)
{
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx;
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
@@ -979,6 +972,7 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx)
mpack_writer_t writer;
mpack_writer_init_growable(&writer, &data, &size);
// root map
mpack_build_map(&writer);
mpack_write_cstr(&writer, "tsync");
@@ -994,9 +988,11 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx)
mpack_write_cstr(&writer, "log_update");
mpack_write_cstr(&writer, "params");
// params map
mpack_build_map(&writer);
mpack_write_cstr(&writer, "proxy");
// proxy map
mpack_build_map(&writer);
mpack_write_cstr(&writer, "ssl_intercept_info");
@@ -1031,9 +1027,30 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx)
mpack_write_str(&writer, ssl_error, ssl_error_length);
mpack_write_str(&writer, ssl_passthrough_reason, ssl_passthrough_reason_length);
mpack_complete_array(&writer);
// proxy map end
mpack_complete_map(&writer);
// params map end
mpack_complete_map(&writer);
// root map end
mpack_complete_map(&writer);
// finish writing
if (mpack_writer_destroy(&writer) != mpack_ok)
{
assert(0);
if (data)
{
free(data);
data = NULL;
}
return;
}
struct ethhdr *eth_hdr = (struct ethhdr *)s_ctx->ctrl_meta->raw_data;
struct ip *ip_hdr = (struct ip *)((char *)eth_hdr + sizeof(struct ethhdr));
struct tcphdr *tcp_hdr = (struct tcphdr *)((char *)ip_hdr + sizeof(struct ip));
// ip_hdr->ip_len = htons(sizeof(struct ip) + (ntohs(tcp_hdr->th_off) * 4) + size);
ip_hdr->ip_len = htons(sizeof(struct ip) + 20 + size);
char *raw_packet_header_data = s_ctx->ctrl_meta->raw_data;
int raw_packet_header_len = s_ctx->ctrl_meta->l7offset;
@@ -1052,7 +1069,6 @@ static void send_event_log(struct session_ctx *s_ctx, int thread_seq, void *ctx)
int nsend = marsio_buff_datalen(tx_buffs[0]);
marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread_seq, tx_buffs, 1);
mpack_writer_destroy(&writer);
if (data)
free(data);
return;
@@ -1083,7 +1099,7 @@ static int handle_session_opening(struct metadata *meta, struct ctrl_pkt_parser
struct sockaddr_in *in_addr_client = (struct sockaddr_in *)&restore_info.client.addr;
struct sockaddr_in *in_addr_server = (struct sockaddr_in *)&restore_info.server.addr;
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx;
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct packet_io *packet_io = thread->ref_io;
struct raw_pkt_parser raw_parser;
@@ -1259,7 +1275,7 @@ end:
// return -1 : error
static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx;
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id);
if (!node)
@@ -1274,7 +1290,7 @@ static int handle_session_active(struct metadata *meta, struct ctrl_pkt_parser *
// return -1 : error
static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx;
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct session_node *node = session_table_search_by_id(thread->session_table, meta->session_id);
if (node)
@@ -1292,14 +1308,14 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser
// return -1 : error
static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser *parser, int thread_seq, void *ctx)
{
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx;
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
TFE_LOG_ERROR(g_default_logger, "%s: session %lu resetall: notification clears all session tables !!!", LOG_TAG_PKTIO, meta->session_id);
for (int i = 0; i < acceptor_ctx->nr_worker_threads; i++)
{
struct acceptor_thread_ctx *thread_ctx = &acceptor_ctx->work_threads[i];
struct packet_io_thread_ctx *thread_ctx = &acceptor_ctx->work_threads[i];
__atomic_fetch_add(&thread_ctx->session_table_need_reset, 1, __ATOMIC_RELAXED);
}
@@ -1310,9 +1326,9 @@ static int handle_session_resetall(struct metadata *meta, struct ctrl_pkt_parser
// return -1 : error
static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx)
{
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx;
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
struct metadata meta;
if (packet_io_get_metadata(rx_buff, &meta) == -1)
@@ -1340,21 +1356,21 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf
switch (ctrl_parser.state)
{
case SESSION_STATE_OPENING:
__atomic_fetch_add(&g_metrics->ctrl_pkt_opening_num, 1, __ATOMIC_RELAXED);
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_opening_num, 1, __ATOMIC_RELAXED);
// when session opening, firewall not send policy id
// return handle_session_opening(&meta, &ctrl_parser, thread_seq, ctx);
break;
case SESSION_STATE_CLOSING:
__atomic_fetch_add(&g_metrics->ctrl_pkt_closing_num, 1, __ATOMIC_RELAXED);
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_closing_num, 1, __ATOMIC_RELAXED);
return handle_session_closing(&meta, &ctrl_parser, thread_seq, ctx);
case SESSION_STATE_ACTIVE:
__atomic_fetch_add(&g_metrics->ctrl_pkt_active_num, 1, __ATOMIC_RELAXED);
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_active_num, 1, __ATOMIC_RELAXED);
return handle_session_active(&meta, &ctrl_parser, thread_seq, ctx);
case SESSION_STATE_RESETALL:
__atomic_fetch_add(&g_metrics->ctrl_pkt_resetall_num, 1, __ATOMIC_RELAXED);
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_resetall_num, 1, __ATOMIC_RELAXED);
return handle_session_resetall(&meta, &ctrl_parser, thread_seq, ctx);
default:
__atomic_fetch_add(&g_metrics->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED);
__atomic_fetch_add(&packet_io_fs->ctrl_pkt_error_num, 1, __ATOMIC_RELAXED);
break;
}
@@ -1363,9 +1379,9 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf
static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx)
{
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx;
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct packet_io *packet_io = thread->ref_io;
struct global_metrics *g_metrics = thread->ref_metrics;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
@@ -1380,7 +1396,6 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
return -1;
}
time_echo(meta.session_id, "raw pkg from nf start");
struct session_node *node = session_table_search_by_id(thread->session_table, meta.session_id);
if (node == NULL)
@@ -1403,7 +1418,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
else {
tfe_tap_write_per_thread(thread->tap_ctx->tap_s, raw_data, raw_len, g_default_logger);
}
throughput_metrics_inc(&g_metrics->tap_s_pkt_tx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->tap_s_pkt_tx, 1, raw_len);
}
// s2c
else {
@@ -1414,7 +1429,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
else {
tfe_tap_write_per_thread(thread->tap_ctx->tap_c, raw_data, raw_len, g_default_logger);
}
throughput_metrics_inc(&g_metrics->tap_c_pkt_tx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->tap_c_pkt_tx, 1, raw_len);
}
}
else
@@ -1450,7 +1465,7 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
else {
tfe_tap_write_per_thread(thread->tap_ctx->tap_fd, raw_data, raw_len, g_default_logger);
}
throughput_metrics_inc(&g_metrics->tap_pkt_tx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->tap_pkt_tx, 1, raw_len);
uint8_t flag = tfe_cmsg_get_flag(s_ctx->cmsg);
if (flag & TFE_CMSG_FLAG_USER0) {
@@ -1459,11 +1474,9 @@ static int handle_raw_packet_from_nf(struct packet_io *handle, marsio_buff_t *rx
}
}
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
time_echo(meta.session_id, "raw pkg from nf end");
return 0;
}
/******************************************************************************
* EXTERN
******************************************************************************/
@@ -1495,7 +1508,7 @@ void tfe_tap_ctx_destory(struct tap_ctx *handler)
struct tap_ctx *tfe_tap_ctx_create(void *ctx)
{
int ret = 0;
struct acceptor_thread_ctx *thread_ctx = (struct acceptor_thread_ctx *)ctx;
struct packet_io_thread_ctx *thread_ctx = (struct packet_io_thread_ctx *)ctx;
struct acceptor_kni_v4 *acceptor_ctx = thread_ctx->ref_acceptor_ctx;
struct packet_io *packet_io = acceptor_ctx->io;
struct tap_ctx *tap_ctx = (struct tap_ctx *)calloc(1, sizeof(struct tap_ctx));
@@ -1553,7 +1566,7 @@ error_out:
return NULL;
}
int packet_io_thread_init(struct packet_io *handle, struct acceptor_thread_ctx *thread_ctx)
int packet_io_thread_init(struct packet_io *handle, struct packet_io_thread_ctx *thread_ctx)
{
if (marsio_thread_init(handle->instance) != 0)
{
@@ -1564,7 +1577,7 @@ int packet_io_thread_init(struct packet_io *handle, struct acceptor_thread_ctx *
return 0;
}
void packet_io_thread_wait(struct packet_io *handle, struct acceptor_thread_ctx *thread_ctx, int timeout_ms)
void packet_io_thread_wait(struct packet_io *handle, struct packet_io_thread_ctx *thread_ctx, int timeout_ms)
{
struct mr_vdev *vdevs[] = {handle->dev_nf_interface.mr_dev};
@@ -1679,8 +1692,8 @@ error_out:
// return n_packet_recv
int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx)
{
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)ctx;
struct packet_io_fs *packet_io_fs = thread->ret_fs_state;
marsio_buff_t *rx_buffs[RX_BURST_MAX];
@@ -1715,14 +1728,14 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi
if (marsio_buff_is_ctrlbuf(rx_buff))
{
throughput_metrics_inc(&g_metrics->ctrl_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->ctrl_pkt_rx, 1, raw_len);
// all control packet need bypass
handle_control_packet(handle, rx_buff, thread_seq, ctx);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
}
else
{
throughput_metrics_inc(&g_metrics->raw_pkt_rx, 1, raw_len);
throughput_metrics_inc(&packet_io_fs->raw_pkt_rx, 1, raw_len);
handle_raw_packet_from_nf(handle, rx_buff, thread_seq, ctx);
}
}
@@ -1732,7 +1745,7 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi
void handle_decryption_packet_from_tap(const char *data, int len, void *args)
{
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)args;
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)args;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
@@ -1752,8 +1765,6 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args)
return;
}
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
time_echo(s_ctx->session_id, "decryption pkg from nf start");
marsio_buff_t *tx_buffs[1];
int alloc_ret = marsio_buff_malloc_device(packet_io->dev_nf_interface.mr_dev, tx_buffs, 1, 0, thread->thread_index);
if (alloc_ret < 0){
@@ -1791,14 +1802,13 @@ void handle_decryption_packet_from_tap(const char *data, int len, void *args)
}
packet_io_set_metadata(tx_buffs[0], &meta);
marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1);
time_echo(s_ctx->session_id, "decryption pkg from nf end");
}
void handle_raw_packet_from_tap(const char *data, int len, void *args)
{
char *src_mac = NULL;
char *dst_mac = NULL;
struct acceptor_thread_ctx *thread = (struct acceptor_thread_ctx *)args;
struct packet_io_thread_ctx *thread = (struct packet_io_thread_ctx *)args;
struct acceptor_kni_v4 *acceptor_ctx = thread->ref_acceptor_ctx;
struct packet_io *packet_io = thread->ref_io;
@@ -1818,8 +1828,6 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args)
return;
}
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
time_echo(s_ctx->session_id, "raw pkg from tap start");
marsio_buff_t *tx_buffs[1];
int alloc_ret = marsio_buff_malloc_device(packet_io->dev_nf_interface.mr_dev, tx_buffs, 1, 0, thread->thread_index);
if (alloc_ret < 0){
@@ -1867,5 +1875,4 @@ void handle_raw_packet_from_tap(const char *data, int len, void *args)
packet_io_set_metadata(tx_buffs[0], &meta);
add_ether_header(dst, src_mac, dst_mac);
marsio_send_burst(packet_io->dev_nf_interface.mr_path, thread->thread_index, tx_buffs, 1);
time_echo(s_ctx->session_id, "raw pkg from tap end");
}