diff --git a/platform/include/global_metrics.h b/platform/include/global_metrics.h index 977ef8e..14e9f99 100644 --- a/platform/include/global_metrics.h +++ b/platform/include/global_metrics.h @@ -40,8 +40,9 @@ struct global_metrics struct throughput_metrics mirroring_tx; // 累计值 struct throughput_metrics mirroring_rx_drop; // 累计值 - struct throughput_metrics keepalive_packet_rx; // 累计值 - struct throughput_metrics control_packet_rx; // 累计值 + struct throughput_metrics downlink_keepalive_packet_rx; // 累计值 + struct throughput_metrics uplink_keepalive_packet_rx; // 累计值 + struct throughput_metrics control_packet_rx; // 累计值 uint64_t control_packet_opening_num; // 累计值 uint64_t control_packet_active_num; // 累计值 @@ -58,7 +59,7 @@ struct global_metrics struct global_metrics_config config; screen_stat_handle_t fs_handle; - int fs_id[32]; + int fs_id[128]; }; struct global_metrics *global_metrics_create(const char *profile); diff --git a/platform/src/global_metrics.cpp b/platform/src/global_metrics.cpp index cc2ea5e..72ca03c 100644 --- a/platform/src/global_metrics.cpp +++ b/platform/src/global_metrics.cpp @@ -62,8 +62,10 @@ enum SCE_STAT_FIELD STAT_CURRENT_SESSION_NUMS, // keepalive packet - STAT_KEEPALIVE_RX_PKT, - STAT_KEEPALIVE_RX_B, + STAT_DOWNLINK_KEEPALIVE_RX_PKT, + STAT_DOWNLINK_KEEPALIVE_RX_B, + STAT_UPLINK_KEEPALIVE_RX_PKT, + STAT_UPLINK_KEEPALIVE_RX_B, // health check STAT_SF_ACTIVE_TIMES, @@ -114,8 +116,8 @@ static const char *stat_map[] = // mirroring [STAT_MIRRORING_TX_PKT] = "mirr_tx_pkt", [STAT_MIRRORING_TX_B] = "mirr_tx_B", - [STAT_MIRRORING_RX_DROP_PKT] = "mirr_rx_drop_pkt", - [STAT_MIRRORING_RX_DROP_B] = "mirro_rx_drop_B", + [STAT_MIRRORING_RX_DROP_PKT] = "mirr_rx_dop_pkt", + [STAT_MIRRORING_RX_DROP_B] = "mirr_rx_dop_B", // control packet [STAT_CONTROL_RX_PKT] = "ctrl_rx_pkt", @@ -131,14 +133,16 @@ static const char *stat_map[] = [STAT_CURRENT_SESSION_NUMS] = "curr_sess_num", // keepalive packet - [STAT_KEEPALIVE_RX_PKT] = "kepalive_rx_pkt", - [STAT_KEEPALIVE_RX_B] = "kepalive_rx_B", + [STAT_DOWNLINK_KEEPALIVE_RX_PKT] = "dlnk_kep_rx_pkt", + [STAT_DOWNLINK_KEEPALIVE_RX_B] = "dlnk_kep_rx_B", + [STAT_UPLINK_KEEPALIVE_RX_PKT] = "ulnk_kep_rx_pkt", + [STAT_UPLINK_KEEPALIVE_RX_B] = "ulnk_kep_rx_B", // health check - [STAT_SF_ACTIVE_TIMES] = "sf_active_num", - [STAT_SF_INACTIVE_TIMES] = "sf_inactive_num", - [STAT_SF_MAC_EXIST_TIMES] = "sf_mac_succ_num", - [STAT_SF_MAC_NOEXIST_TIMES] = "sf_mac_err_num", + [STAT_SF_ACTIVE_TIMES] = "sf_active", + [STAT_SF_INACTIVE_TIMES] = "sf_inactive", + [STAT_SF_MAC_EXIST_TIMES] = "sf_mac_succ", + [STAT_SF_MAC_NOEXIST_TIMES] = "sf_mac_err", [STAT_MAX] = NULL}; @@ -196,6 +200,13 @@ struct global_metrics *global_metrics_create(const char *profile) FS_set_para(metrics->fs_handle, STATS_FORMAT, &metrics->config.statsd_format, sizeof(metrics->config.statsd_format)); } + if (STAT_MAX >= (sizeof(metrics->fs_id) / sizeof(metrics->fs_id[0]))) + { + LOG_ERROR("%s: field stat has insufficient space to store fs_id, and supports a maximum of %lu fsids, but %d is needed ", LOG_TAG_METRICS, (sizeof(metrics->fs_id) / sizeof(metrics->fs_id[0])), STAT_MAX); + global_metrics_destory(metrics); + return NULL; + } + for (int i = 0; i < STAT_MAX; i++) { metrics->fs_id[i] = FS_register(metrics->fs_handle, FS_STYLE_FIELD, FS_CALC_CURRENT, stat_map[i]); @@ -261,8 +272,10 @@ void global_metrics_dump(struct global_metrics *metrics) FS_operate(metrics->fs_handle, metrics->fs_id[STAT_MIRRORING_RX_DROP_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->mirroring_rx_drop.n_bytes), 0, __ATOMIC_RELAXED)); // keepalive packet - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEEPALIVE_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->keepalive_packet_rx.n_pkts), 0, __ATOMIC_RELAXED)); - FS_operate(metrics->fs_handle, metrics->fs_id[STAT_KEEPALIVE_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->keepalive_packet_rx.n_bytes), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DOWNLINK_KEEPALIVE_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->downlink_keepalive_packet_rx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_DOWNLINK_KEEPALIVE_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->downlink_keepalive_packet_rx.n_bytes), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_UPLINK_KEEPALIVE_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->uplink_keepalive_packet_rx.n_pkts), 0, __ATOMIC_RELAXED)); + FS_operate(metrics->fs_handle, metrics->fs_id[STAT_UPLINK_KEEPALIVE_RX_B], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->uplink_keepalive_packet_rx.n_bytes), 0, __ATOMIC_RELAXED)); // control packet FS_operate(metrics->fs_handle, metrics->fs_id[STAT_CONTROL_RX_PKT], 0, FS_OP_SET, __atomic_fetch_add(&(metrics->control_packet_rx.n_pkts), 0, __ATOMIC_RELAXED)); diff --git a/platform/src/main.cpp b/platform/src/main.cpp index 36d4000..9e58fc0 100644 --- a/platform/src/main.cpp +++ b/platform/src/main.cpp @@ -31,13 +31,13 @@ static void *worker_thread_cycle(void *arg) n_packet_recv = packet_io_polling_nf_interface(handle, thread_ctx->thread_index, thread_ctx); if (n_packet_recv) { - LOG_INFO("%s: worker thread %d recv %03d packets from nf_interface", LOG_TAG_SCE, thread_ctx->thread_index, n_packet_recv); + // LOG_INFO("%s: worker thread %d recv %03d packets from nf_interface", LOG_TAG_SCE, thread_ctx->thread_index, n_packet_recv); } n_packet_recv = packet_io_polling_endpoint(handle, thread_ctx->thread_index, thread_ctx); if (n_packet_recv) { - LOG_INFO("%s: worker thread %d recv %03d packets from endpoint", LOG_TAG_SCE, thread_ctx->thread_index, n_packet_recv); + // LOG_INFO("%s: worker thread %d recv %03d packets from endpoint", LOG_TAG_SCE, thread_ctx->thread_index, n_packet_recv); } if (__atomic_fetch_add(&thread_ctx->session_table_need_reset, 0, __ATOMIC_RELAXED) > 0) diff --git a/platform/src/packet_io.cpp b/platform/src/packet_io.cpp index 1070ae7..054397c 100644 --- a/platform/src/packet_io.cpp +++ b/platform/src/packet_io.cpp @@ -75,6 +75,7 @@ enum raw_pkt_action enum inject_pkt_action { INJT_PKT_ERR_DROP, + INJT_PKT_MIRR_RX_DROP, INJT_PKT_HIT_BLOCK, INJT_PKT_HIT_FWD2SF, // forward to service function INJT_PKT_HIT_FWD2NF, // forward to network function @@ -128,6 +129,7 @@ static int handle_control_packet(struct packet_io *handle, marsio_buff_t *rx_buf // reutrn : RAW_PKT_HIT_FORWARD static enum raw_pkt_action handle_raw_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx, int *action_bytes); // return : INJT_PKT_ERR_DROP +// return : INJT_PKT_MIRR_RX_DROP // return : INJT_PKT_HIT_BLOCK // return : INJT_PKT_HIT_FWD2SF // return : INJT_PKT_HIT_FWD2NF @@ -137,6 +139,10 @@ static enum inject_pkt_action handle_inject_packet(struct packet_io *handle, mar // return + : send n bytes // return -1 : error static int forward_packet_to_sf(struct packet_io *handle, marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, int thread_seq, void *ctx); +// rx_buff : not include g_vxlan header +// return + : send n bytes +// return -1 : error +static int mirror_packet_to_sf(struct packet_io *handle, marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, int thread_seq, void *ctx); // rx_buff : include g_vxlan header // return + : send n bytes // return -1 : error @@ -165,7 +171,10 @@ static void session_value_free_cb(void *ctx); // return 0 : not keepalive packet // return 1 : is keepalive packet -static int marsio_buff_is_keepalive(marsio_buff_t *rx_buff); +static int is_downstream_keepalive_packet(marsio_buff_t *rx_buff); +// return 0 : not keepalive packet +// return 1 : is keepalive packet +static int is_upstream_keepalive_packet(marsio_buff_t *rx_buff); /****************************************************************************** * API Definition @@ -345,9 +354,9 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi marsio_buff_t *rx_buff = rx_buffs[j]; int raw_len = marsio_buff_datalen(rx_buff); - if (marsio_buff_is_keepalive(rx_buff)) + if (is_downstream_keepalive_packet(rx_buff)) { - throughput_metrics_inc(&g_metrics->keepalive_packet_rx, 1, raw_len); + throughput_metrics_inc(&g_metrics->downlink_keepalive_packet_rx, 1, raw_len); marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); continue; } @@ -379,6 +388,7 @@ int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, voi throughput_metrics_inc(&g_metrics->hit_block_policy, 1, action_bytes); break; case RAW_PKT_HIT_FORWARD: + throughput_metrics_inc(&g_metrics->steering_tx, 1, action_bytes); throughput_metrics_inc(&g_metrics->dev_endpoint_tx, 1, action_bytes); break; } @@ -442,6 +452,13 @@ int packet_io_polling_endpoint(struct packet_io *handle, int thread_seq, void *c int data_len = marsio_buff_datalen(rx_buff); throughput_metrics_inc(&g_metrics->dev_endpoint_rx, 1, data_len); + if (is_upstream_keepalive_packet(rx_buff)) + { + throughput_metrics_inc(&g_metrics->uplink_keepalive_packet_rx, 1, data_len); + marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq); + continue; + } + int action_bytes = 0; enum inject_pkt_action action = handle_inject_packet(handle, rx_buff, thread_seq, ctx, &action_bytes); assert(action_bytes > 0); @@ -450,13 +467,20 @@ int packet_io_polling_endpoint(struct packet_io *handle, int thread_seq, void *c case INJT_PKT_ERR_DROP: throughput_metrics_inc(&g_metrics->dev_endpoint_err_drop, 1, action_bytes); break; + case INJT_PKT_MIRR_RX_DROP: + throughput_metrics_inc(&g_metrics->mirroring_rx_drop, 1, data_len); // use data_len + break; case INJT_PKT_HIT_BLOCK: + throughput_metrics_inc(&g_metrics->steering_rx, 1, data_len); // use data_len throughput_metrics_inc(&g_metrics->hit_block_policy, 1, action_bytes); break; - case INJT_PKT_HIT_FWD2SF: // forward to next service function + case INJT_PKT_HIT_FWD2SF: // forward to next service function + throughput_metrics_inc(&g_metrics->steering_rx, 1, data_len); // use data_len + throughput_metrics_inc(&g_metrics->steering_tx, 1, action_bytes); // use action_bytes throughput_metrics_inc(&g_metrics->dev_endpoint_tx, 1, action_bytes); break; - case INJT_PKT_HIT_FWD2NF: // forward to network function + case INJT_PKT_HIT_FWD2NF: // forward to network function + throughput_metrics_inc(&g_metrics->steering_rx, 1, data_len); // use data_len throughput_metrics_inc(&g_metrics->dev_nf_interface_tx, 1, action_bytes); break; } @@ -724,6 +748,8 @@ static enum raw_pkt_action handle_raw_packet(struct packet_io *handle, marsio_bu { int nsend = 0; struct thread_ctx *thread = (struct thread_ctx *)ctx; + struct global_metrics *g_metrics = thread->ref_metrics; + int raw_len = marsio_buff_datalen(rx_buff); *action_bytes = 0; @@ -805,43 +831,70 @@ static enum raw_pkt_action handle_raw_packet(struct packet_io *handle, marsio_bu return RAW_PKT_ERR_BYPASS; } - // rx_buff : not include g_vxlan header - // return + : send n bytes - // return -1 : error - nsend = forward_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx); - if (nsend > 0) + if (node->sff_forward_type == FORWARD_TYPE_STEERING) { - throughput_metrics_inc(&node->tx, 1, nsend); - *action_bytes = nsend; - return RAW_PKT_HIT_FORWARD; + // rx_buff : not include g_vxlan header + // return + : send n bytes + // return -1 : error + nsend = forward_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx); + if (nsend > 0) + { + throughput_metrics_inc(&node->tx, 1, nsend); + *action_bytes = nsend; + return RAW_PKT_HIT_FORWARD; + } + else + { + LOG_ERROR("%s: processing raw packet, session %lu %s forwarding packet to service function failed, bypass !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string); + marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); + *action_bytes = raw_len; + return RAW_PKT_ERR_BYPASS; + } } else { - LOG_ERROR("%s: processing raw packet, session %lu %s forwarding packet to service function failed, bypass !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string); - marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); - *action_bytes = raw_len; - return RAW_PKT_ERR_BYPASS; + // rx_buff : not include g_vxlan header + // return + : send n bytes + // return -1 : error + nsend = mirror_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx); + if (nsend > 0) + { + throughput_metrics_inc(&node->tx, 1, nsend); + throughput_metrics_inc(&g_metrics->mirroring_tx, 1, nsend); + throughput_metrics_inc(&g_metrics->dev_endpoint_tx, 1, nsend); + continue; + } + else + { + LOG_ERROR("%s: processing raw packet, session %lu %s mirroring packet to service function failed, bypass !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string); + marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); + *action_bytes = raw_len; + return RAW_PKT_ERR_BYPASS; + } } + default: continue; } } - // BYPASS ALL SF - LOG_INFO("%s: session %lu %s bypass all service function", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string); + // BYPASS ALL SF or LAST SF IS MIRRORING marsio_send_burst(handle->dev_nf_interface.mr_path, thread_seq, &rx_buff, 1); *action_bytes = raw_len; return RAW_PKT_HIT_BYPASS; } // return : INJT_PKT_ERR_DROP +// return : INJT_PKT_MIRR_RX_DROP // return : INJT_PKT_HIT_BLOCK // return : INJT_PKT_HIT_FWD2SF // return : INJT_PKT_HIT_FWD2NF static enum inject_pkt_action handle_inject_packet(struct packet_io *handle, marsio_buff_t *rx_buff, int thread_seq, void *ctx, int *action_bytes) { int nsend = 0; + int mbuff_is_adj = 0; struct thread_ctx *thread = (struct thread_ctx *)ctx; + struct global_metrics *g_metrics = thread->ref_metrics; struct g_vxlan *g_vxlan_hdr = NULL; int raw_len = marsio_buff_datalen(rx_buff); @@ -908,7 +961,15 @@ static enum inject_pkt_action handle_inject_packet(struct packet_io *handle, mar return INJT_PKT_ERR_DROP; } - throughput_metrics_inc(&chaining->chaining[sf_index].rx, 1, meta.raw_len); + if (chaining->chaining[sf_index].sff_forward_type == FORWARD_TYPE_MIRRORING) + { + LOG_DEBUG("%s: unexpected inject packet, session %lu %s with sf_profile_id %d executes mirror and does not require reflow, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string, chaining->chaining[sf_index].sf_profile_id); + marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq); + *action_bytes = raw_len; + return INJT_PKT_MIRR_RX_DROP; + } + + throughput_metrics_inc(&chaining->chaining[sf_index].rx, 1, raw_len); int next_sf_index; for (next_sf_index = sf_index + 1; next_sf_index < chaining->chaining_used; next_sf_index++) @@ -940,25 +1001,55 @@ static enum inject_pkt_action handle_inject_packet(struct packet_io *handle, mar *action_bytes = raw_len; return INJT_PKT_ERR_DROP; } - marsio_buff_adj(rx_buff, raw_len - meta.raw_len); - // rx_buff : not include g_vxlan header - // return + : send n bytes - // return -1 : error - nsend = forward_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx); - if (nsend > 0) + if (mbuff_is_adj == 0) { - throughput_metrics_inc(&node->tx, 1, nsend); - *action_bytes = nsend; - return INJT_PKT_HIT_FWD2SF; + marsio_buff_adj(rx_buff, raw_len - meta.raw_len); + mbuff_is_adj = 1; + } + + if (node->sff_forward_type == FORWARD_TYPE_STEERING) + { + // rx_buff : not include g_vxlan header + // return + : send n bytes + // return -1 : error + nsend = forward_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx); + if (nsend > 0) + { + throughput_metrics_inc(&node->tx, 1, nsend); + *action_bytes = nsend; + return INJT_PKT_HIT_FWD2SF; + } + else + { + LOG_ERROR("%s: processing inject packet, session %lu %s forwarding packet to service function failed, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string); + marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq); + *action_bytes = raw_len; + return INJT_PKT_ERR_DROP; + } } else { - LOG_ERROR("%s: processing inject packet, session %lu %s forwarding packet to service function failed, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string); - marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq); - *action_bytes = raw_len; - return INJT_PKT_ERR_DROP; + // rx_buff : not include g_vxlan header + // return + : send n bytes + // return -1 : error + nsend = mirror_packet_to_sf(handle, rx_buff, &meta, node, thread_seq, ctx); + if (nsend > 0) + { + throughput_metrics_inc(&node->tx, 1, nsend); + throughput_metrics_inc(&g_metrics->mirroring_tx, 1, nsend); + throughput_metrics_inc(&g_metrics->dev_endpoint_tx, 1, nsend); + continue; + } + else + { + LOG_ERROR("%s: processing inject packet, session %lu %s mirroring packet to service function failed, drop !!!", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string); + marsio_buff_free(handle->instance, &rx_buff, 1, 0, thread_seq); + *action_bytes = raw_len; + return INJT_PKT_ERR_DROP; + } } + default: assert(0); continue; @@ -1027,6 +1118,32 @@ static int forward_packet_to_sf(struct packet_io *handle, marsio_buff_t *rx_buff return raw_len; } +// rx_buff : not include g_vxlan header +// return + : send n bytes +// return -1 : error +static int mirror_packet_to_sf(struct packet_io *handle, marsio_buff_t *rx_buff, struct metadata *meta, struct selected_sf *sf, int thread_seq, void *ctx) +{ + marsio_buff_t *new_buff = NULL; + if (marsio_buff_malloc_global(handle->instance, &new_buff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0) + { + LOG_ERROR("%s: unable to malloc buff on marsio instance, thread_seq: %d", LOG_TAG_PKTIO, thread_seq); + return -1; + } + + unsigned int raw_len = marsio_buff_datalen(rx_buff); + const char *raw_data = marsio_buff_mtod(rx_buff); + char *copy_ptr = marsio_buff_append(new_buff, raw_len); + memcpy(copy_ptr, raw_data, raw_len); + + int nsend = forward_packet_to_sf(handle, new_buff, meta, sf, thread_seq, ctx); + if (nsend == -1) + { + marsio_buff_free(handle->instance, &new_buff, 1, 0, thread_seq); + } + + return nsend; +} + // rx_buff : include g_vxlan header // return + : send n bytes // return -1 : error @@ -1145,6 +1262,7 @@ static int forward_all_sf_packet_to_nf(struct packet_io *handle, marsio_buff_t * struct g_vxlan *g_vxlan_hdr = NULL; int raw_len = marsio_buff_datalen(rx_buff); char *raw_data = marsio_buff_mtod(rx_buff); + if (g_vxlan_decode(&g_vxlan_hdr, raw_data, raw_len) == -1) { LOG_ERROR("%s: unexpected inject packet, not a vxlan-encapsulated packet, drop !!!", LOG_TAG_PKTIO); @@ -1276,6 +1394,13 @@ static int handle_session_closing(struct metadata *meta, struct ctrl_pkt_parser { struct session_ctx *s_ctx = (struct session_ctx *)node->val_data; LOG_INFO("%s: session %lu %s closing", LOG_TAG_PKTIO, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string); + struct selected_chaining *chaining = s_ctx->chaining; + + for (int i = 0; i < chaining->chaining_used; i++) + { + struct selected_sf *node = &(chaining->chaining[i]); + LOG_INFO("%s: session %lu %s metrics log: policy %d sff_profile_id %d sf_profile_id %d sf_need_skip %d sf_action_reason %s rx_pkts %lu rx_bytes %lu tx_pkts %lu tx_bytes %lu", LOG_TAG_METRICS, s_ctx->session_id, s_ctx->first_ctrl_pkt.addr_string, node->policy_id, node->sff_profile_id, node->sf_profile_id, node->sf_need_skip, session_action_reason_to_string(node->sf_action_reason), node->rx.n_pkts, node->rx.n_bytes, node->tx.n_pkts, node->tx.n_bytes); + } // TODO send log to firewall @@ -1360,11 +1485,11 @@ static void session_value_free_cb(void *ctx) // return 0 : not keepalive packet // return 1 : is keepalive packet -static int marsio_buff_is_keepalive(marsio_buff_t *rx_buff) +static int is_downstream_keepalive_packet(marsio_buff_t *rx_buff) { int raw_len = marsio_buff_datalen(rx_buff); char *raw_data = marsio_buff_mtod(rx_buff); - if (raw_data == NULL || raw_len == 0) + if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr))) { return 0; } @@ -1378,4 +1503,36 @@ static int marsio_buff_is_keepalive(marsio_buff_t *rx_buff) { return 0; } +} + +// return 0 : not keepalive packet +// return 1 : is keepalive packet +static int is_upstream_keepalive_packet(marsio_buff_t *rx_buff) +{ + int raw_len = marsio_buff_datalen(rx_buff); + char *raw_data = marsio_buff_mtod(rx_buff); + if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr) + sizeof(struct ip) + sizeof(struct udp_hdr))) + { + return 0; + } + + struct ethhdr *eth_hdr = (struct ethhdr *)raw_data; + if (eth_hdr->h_proto != htons(ETH_P_IP)) + { + return 0; + } + + struct ip *ip_hdr = (struct ip *)((char *)eth_hdr + sizeof(struct ethhdr)); + if (ip_hdr->ip_p != IPPROTO_UDP) + { + return 0; + } + + struct udp_hdr *udp_hdr = (struct udp_hdr *)((char *)ip_hdr + sizeof(struct ip)); + if (udp_hdr->uh_dport != htons(3784)) + { + return 0; + } + + return 1; } \ No newline at end of file diff --git a/platform/src/policy.cpp b/platform/src/policy.cpp index 012fda9..df9e422 100644 --- a/platform/src/policy.cpp +++ b/platform/src/policy.cpp @@ -1408,7 +1408,7 @@ void selected_chaining_bref(struct selected_chaining *chaining) struct selected_sf *node = &(chaining->chaining[i]); if (buff_size - buff_used > 0) { - buff_used += snprintf(buff + buff_used, buff_size - buff_used, "\"node[%d]\":{\"skip\":%d,\"reason\":\"%s\",\"policy_id\":%d,\"sff_profile_id\":%d,\"sf_profile_id\":%d}, ", i, node->sf_need_skip, session_action_reason_to_string(node->sf_action_reason), node->policy_id, node->sff_profile_id, node->sf_profile_id); + buff_used += snprintf(buff + buff_used, buff_size - buff_used, "\"node[%d]\":{\"skip\":%d,\"reason\":\"%s\",\"policy_id\":%d,\"sff_profile_id\":%d,\"sf_profile_id\":%d,\"sff_forward_type\":\"%s\"}, ", i, node->sf_need_skip, session_action_reason_to_string(node->sf_action_reason), node->policy_id, node->sff_profile_id, node->sf_profile_id, forward_type_to_string(node->sff_forward_type)); } } LOG_DEBUG("%s: selected_chaining_bref: %s}", LOG_TAG_POLICY, buff); @@ -1457,12 +1457,12 @@ void policy_enforce_select_chaining(struct selected_chaining *chaining, struct p continue; } item->sff_forward_type = sff_param->sff_forward_type; - LOG_DEBUG("%s: chaining policy %d -> sff_profile %d sf_profile_ids_num %d (before filter nearby and sctive)", LOG_TAG_POLICY, policy_id, item->sff_profile_id, sff_param->sf_profile_ids_num); + LOG_DEBUG("%s: chaining policy %d -> sff_profile %d sf_profile_ids_num %d (before filter nearby and active)", LOG_TAG_POLICY, policy_id, item->sff_profile_id, sff_param->sf_profile_ids_num); memset(&array, 0, sizeof(array)); fixed_num_array_init(&array); select_sf_by_nearby_and_active(enforcer, sff_param, &array); - LOG_DEBUG("%s: chaining policy %d -> sff_profile %d sf_profile_ids_num %d (after filter nearby and sctive)", LOG_TAG_POLICY, policy_id, item->sff_profile_id, fixed_num_array_count_elem(&array)); + LOG_DEBUG("%s: chaining policy %d -> sff_profile %d sf_profile_ids_num %d (after filter nearby and active)", LOG_TAG_POLICY, policy_id, item->sff_profile_id, fixed_num_array_count_elem(&array)); if (fixed_num_array_count_elem(&array) == 0) { LOG_DEBUG("%s: chaining policy %d -> sff_profile %d, no sf available after filtering by 'nearby & active', bypass current sff !!!", LOG_TAG_POLICY, policy_id, item->sff_profile_id);