TSG-13938 tsg-service-chaining-engine区分mirror和steering并更新fieldstat计数

This commit is contained in:
luwenpeng
2023-02-27 14:37:31 +08:00
parent 586fb056a6
commit 92af3e1fee
5 changed files with 226 additions and 55 deletions

View File

@@ -75,6 +75,7 @@ enum raw_pkt_action
enum inject_pkt_action
{
INJT_PKT_ERR_DROP,
INJT_PKT_MIRR_RX_DROP,
INJT_PKT_HIT_BLOCK,
INJT_PKT_HIT_FWD2SF, // forward to service function
INJT_PKT_HIT_FWD2NF, // forward to network function
@@ -128,6 +129,7 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf
// reutrn : RAW_PKT_HIT_FORWARD
static enum raw_pkt_action handle_raw_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx, int *action_bytes);
// return : INJT_PKT_ERR_DROP
// return : INJT_PKT_MIRR_RX_DROP
// return : INJT_PKT_HIT_BLOCK
// return : INJT_PKT_HIT_FWD2SF
// return : INJT_PKT_HIT_FWD2NF
@@ -137,6 +139,10 @@ static enum inject_pkt_action handle_inject_packet(struct packet_io *handle, mar
// return + : send n bytes
// return -1 : error
static int forward_packet_to_sf(struct packet_io *handle, marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, int thread_seq, void *ctx);
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
static int mirror_packet_to_sf(struct packet_io *handle, marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, int thread_seq, void *ctx);
// rx_buff : include g_vxlan header
// return + : send n bytes
// return -1 : error
@@ -165,7 +171,10 @@ static void session_value_free_cb(void *ctx);
// return 0 : not keepalive packet
// return 1 : is keepalive packet
static int marsio_buff_is_keepalive(marsio_buff_t *rx_buff);
static int is_downstream_keepalive_packet(marsio_buff_t *rx_buff);
// return 0 : not keepalive packet
// return 1 : is keepalive packet
static int is_upstream_keepalive_packet(marsio_buff_t *rx_buff);
/******************************************************************************
* API Definition
@@ -345,9 +354,9 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi
marsio_buff_t *rx_buff = rx_buffs[j];
int raw_len = marsio_buff_datalen(rx_buff);
if (marsio_buff_is_keepalive(rx_buff))
if (is_downstream_keepalive_packet(rx_buff))
{
throughput_metrics_inc(&g_metrics->keepalive_packet_rx, 1, raw_len);
throughput_metrics_inc(&g_metrics->downlink_keepalive_packet_rx, 1, raw_len);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
continue;
}
@@ -379,6 +388,7 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi
throughput_metrics_inc(&g_metrics->hit_block_policy, 1, action_bytes);
break;
case RAW_PKT_HIT_FORWARD:
throughput_metrics_inc(&g_metrics->steering_tx, 1, action_bytes);
throughput_metrics_inc(&g_metrics->dev_endpoint_tx, 1, action_bytes);
break;
}
@@ -442,6 +452,13 @@ int packet_io_polling_endpoint(struct packet_io *handle, int thread_seq, void *c
int data_len = marsio_buff_datalen(rx_buff);
throughput_metrics_inc(&g_metrics->dev_endpoint_rx, 1, data_len);
if (is_upstream_keepalive_packet(rx_buff))
{
throughput_metrics_inc(&g_metrics->uplink_keepalive_packet_rx, 1, data_len);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
continue;
}
int action_bytes = 0;
enum inject_pkt_action action = handle_inject_packet(handle, rx_buff, thread_seq, ctx, &action_bytes);
assert(action_bytes > 0);
@@ -450,13 +467,20 @@ int packet_io_polling_endpoint(struct packet_io *handle, int thread_seq, void *c
case INJT_PKT_ERR_DROP:
throughput_metrics_inc(&g_metrics->dev_endpoint_err_drop, 1, action_bytes);
break;
case INJT_PKT_MIRR_RX_DROP:
throughput_metrics_inc(&g_metrics->mirroring_rx_drop, 1, data_len); // use data_len
break;
case INJT_PKT_HIT_BLOCK:
throughput_metrics_inc(&g_metrics->steering_rx, 1, data_len); // use data_len
throughput_metrics_inc(&g_metrics->hit_block_policy, 1, action_bytes);
break;
case INJT_PKT_HIT_FWD2SF: // forward to next service function
case INJT_PKT_HIT_FWD2SF: // forward to next service function
throughput_metrics_inc(&g_metrics->steering_rx, 1, data_len); // use data_len
throughput_metrics_inc(&g_metrics->steering_tx, 1, action_bytes); // use action_bytes
throughput_metrics_inc(&g_metrics->dev_endpoint_tx, 1, action_bytes);
break;
case INJT_PKT_HIT_FWD2NF: // forward to network function
case INJT_PKT_HIT_FWD2NF: // forward to network function
throughput_metrics_inc(&g_metrics->steering_rx, 1, data_len); // use data_len
throughput_metrics_inc(&g_metrics->dev_nf_interface_tx, 1, action_bytes);
break;
}
@@ -724,6 +748,8 @@ static enum raw_pkt_action handle_raw_packet(struct packet_io *handle, marsio_bu
{
int nsend = 0;
struct thread_ctx *thread = (struct thread_ctx *)ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
int raw_len = marsio_buff_datalen(rx_buff);
*action_bytes = 0;
@@ -805,43 +831,70 @@ static enum raw_pkt_action handle_raw_packet(struct packet_io *handle, marsio_bu
return RAW_PKT_ERR_BYPASS;
}
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
nsend = forward_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx);
if (nsend > 0)
if (node->sff_forward_type == FORWARD_TYPE_STEERING)
{
throughput_metrics_inc(&node->tx, 1, nsend);
*action_bytes = nsend;
return RAW_PKT_HIT_FORWARD;
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
nsend = forward_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx);
if (nsend > 0)
{
throughput_metrics_inc(&node->tx, 1, nsend);
*action_bytes = nsend;
return RAW_PKT_HIT_FORWARD;
}
else
{
LOG_ERROR("%s: processing raw packet, session %lu %s forwarding packet to service function failed, bypass !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
*action_bytes = raw_len;
return RAW_PKT_ERR_BYPASS;
}
}
else
{
LOG_ERROR("%s: processing raw packet, session %lu %s forwarding packet to service function failed, bypass !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
*action_bytes = raw_len;
return RAW_PKT_ERR_BYPASS;
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
nsend = mirror_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx);
if (nsend > 0)
{
throughput_metrics_inc(&node->tx, 1, nsend);
throughput_metrics_inc(&g_metrics->mirroring_tx, 1, nsend);
throughput_metrics_inc(&g_metrics->dev_endpoint_tx, 1, nsend);
continue;
}
else
{
LOG_ERROR("%s: processing raw packet, session %lu %s mirroring packet to service function failed, bypass !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
*action_bytes = raw_len;
return RAW_PKT_ERR_BYPASS;
}
}
default:
continue;
}
}
// BYPASS ALL SF
LOG_INFO("%s: session %lu %s bypass all service function", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
// BYPASS ALL SF or LAST SF IS MIRRORING
marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1);
*action_bytes = raw_len;
return RAW_PKT_HIT_BYPASS;
}
// return : INJT_PKT_ERR_DROP
// return : INJT_PKT_MIRR_RX_DROP
// return : INJT_PKT_HIT_BLOCK
// return : INJT_PKT_HIT_FWD2SF
// return : INJT_PKT_HIT_FWD2NF
static enum inject_pkt_action handle_inject_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx, int *action_bytes)
{
int nsend = 0;
int mbuff_is_adj = 0;
struct thread_ctx *thread = (struct thread_ctx *)ctx;
struct global_metrics *g_metrics = thread->ref_metrics;
struct g_vxlan *g_vxlan_hdr = NULL;
int raw_len = marsio_buff_datalen(rx_buff);
@@ -908,7 +961,15 @@ static enum inject_pkt_action handle_inject_packet(struct packet_io *handle, mar
return INJT_PKT_ERR_DROP;
}
throughput_metrics_inc(&chaining->chaining[sf_index].rx, 1, meta.raw_len);
if (chaining->chaining[sf_index].sff_forward_type == FORWARD_TYPE_MIRRORING)
{
LOG_DEBUG("%s: unexpected inject packet, session %lu %s with sf_profile_id %d executes mirror and does not require reflow, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string, chaining->chaining[sf_index].sf_profile_id);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_MIRR_RX_DROP;
}
throughput_metrics_inc(&chaining->chaining[sf_index].rx, 1, raw_len);
int next_sf_index;
for (next_sf_index = sf_index + 1; next_sf_index < chaining->chaining_used; next_sf_index++)
@@ -940,25 +1001,55 @@ static enum inject_pkt_action handle_inject_packet(struct packet_io *handle, mar
*action_bytes = raw_len;
return INJT_PKT_ERR_DROP;
}
marsio_buff_adj(rx_buff, raw_len - meta.raw_len);
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
nsend = forward_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx);
if (nsend > 0)
if (mbuff_is_adj == 0)
{
throughput_metrics_inc(&node->tx, 1, nsend);
*action_bytes = nsend;
return INJT_PKT_HIT_FWD2SF;
marsio_buff_adj(rx_buff, raw_len - meta.raw_len);
mbuff_is_adj = 1;
}
if (node->sff_forward_type == FORWARD_TYPE_STEERING)
{
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
nsend = forward_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx);
if (nsend > 0)
{
throughput_metrics_inc(&node->tx, 1, nsend);
*action_bytes = nsend;
return INJT_PKT_HIT_FWD2SF;
}
else
{
LOG_ERROR("%s: processing inject packet, session %lu %s forwarding packet to service function failed, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_ERR_DROP;
}
}
else
{
LOG_ERROR("%s: processing inject packet, session %lu %s forwarding packet to service function failed, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_ERR_DROP;
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
nsend = mirror_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx);
if (nsend > 0)
{
throughput_metrics_inc(&node->tx, 1, nsend);
throughput_metrics_inc(&g_metrics->mirroring_tx, 1, nsend);
throughput_metrics_inc(&g_metrics->dev_endpoint_tx, 1, nsend);
continue;
}
else
{
LOG_ERROR("%s: processing inject packet, session %lu %s mirroring packet to service function failed, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq);
*action_bytes = raw_len;
return INJT_PKT_ERR_DROP;
}
}
default:
assert(0);
continue;
@@ -1027,6 +1118,32 @@ static int forward_packet_to_sf(struct packet_io *handle, marsio_buff_t *rx_buff
return raw_len;
}
// rx_buff : not include g_vxlan header
// return + : send n bytes
// return -1 : error
static int mirror_packet_to_sf(struct packet_io *handle, marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, int thread_seq, void *ctx)
{
marsio_buff_t *new_buff = NULL;
if (marsio_buff_malloc_global(handle->instance, &new_buff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0)
{
LOG_ERROR("%s: unable to malloc buff on marsio instance, thread_seq: %d", LOG_TAG_PKTIO, thread_seq);
return -1;
}
unsigned int raw_len = marsio_buff_datalen(rx_buff);
const char *raw_data = marsio_buff_mtod(rx_buff);
char *copy_ptr = marsio_buff_append(new_buff, raw_len);
memcpy(copy_ptr, raw_data, raw_len);
int nsend = forward_packet_to_sf(handle, new_buff, meta, sf, thread_seq, ctx);
if (nsend == -1)
{
marsio_buff_free(handle->instance, &new_buff, 1, 0, thread_seq);
}
return nsend;
}
// rx_buff : include g_vxlan header
// return + : send n bytes
// return -1 : error
@@ -1145,6 +1262,7 @@ static int forward_all_sf_packet_to_nf(struct packet_io *handle, marsio_buff_t *
struct g_vxlan *g_vxlan_hdr = NULL;
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
if (g_vxlan_decode(&g_vxlan_hdr, raw_data, raw_len) == -1)
{
LOG_ERROR("%s: unexpected inject packet, not a vxlan-encapsulated packet, drop !!!", LOG_TAG_PKTIO);
@@ -1276,6 +1394,13 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser
{
struct session_ctx *s_ctx = (struct session_ctx *)node->val_data;
LOG_INFO("%s: session %lu %s closing", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string);
struct selected_chaining *chaining = s_ctx->chaining;
for (int i = 0; i < chaining->chaining_used; i++)
{
struct selected_sf *node = &(chaining->chaining[i]);
LOG_INFO("%s: session %lu %s metrics log: policy %d sff_profile_id %d sf_profile_id %d sf_need_skip %d sf_action_reason %s rx_pkts %lu rx_bytes %lu tx_pkts %lu tx_bytes %lu", LOG_TAG_METRICS, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string, node->policy_id, node->sff_profile_id, node->sf_profile_id, node->sf_need_skip, session_action_reason_to_string(node->sf_action_reason), node->rx.n_pkts, node->rx.n_bytes, node->tx.n_pkts, node->tx.n_bytes);
}
// TODO send log to firewall
@@ -1360,11 +1485,11 @@ static void session_value_free_cb(void *ctx)
// return 0 : not keepalive packet
// return 1 : is keepalive packet
static int marsio_buff_is_keepalive(marsio_buff_t *rx_buff)
static int is_downstream_keepalive_packet(marsio_buff_t *rx_buff)
{
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
if (raw_data == NULL || raw_len == 0)
if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr)))
{
return 0;
}
@@ -1378,4 +1503,36 @@ static int marsio_buff_is_keepalive(marsio_buff_t *rx_buff)
{
return 0;
}
}
// return 0 : not keepalive packet
// return 1 : is keepalive packet
static int is_upstream_keepalive_packet(marsio_buff_t *rx_buff)
{
int raw_len = marsio_buff_datalen(rx_buff);
char *raw_data = marsio_buff_mtod(rx_buff);
if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr) + sizeof(struct ip) + sizeof(struct udp_hdr)))
{
return 0;
}
struct ethhdr *eth_hdr = (struct ethhdr *)raw_data;
if (eth_hdr->h_proto != htons(ETH_P_IP))
{
return 0;
}
struct ip *ip_hdr = (struct ip *)((char *)eth_hdr + sizeof(struct ethhdr));
if (ip_hdr->ip_p != IPPROTO_UDP)
{
return 0;
}
struct udp_hdr *udp_hdr = (struct udp_hdr *)((char *)ip_hdr + sizeof(struct ip));
if (udp_hdr->uh_dport != htons(3784))
{
return 0;
}
return 1;
}