TSG-13630 tsg-service-chaining-engine使用mrzcpd捕获报文/回注报文

TSG-13685 tsg-service-chaining-engine使用VXLAN封装Steering/Mirroring的Package
This commit is contained in:
luwenpeng
2023-02-10 14:22:40 +08:00
parent 158e4e89e8
commit 737ca3d4be
23 changed files with 2397 additions and 358 deletions

View File

@@ -1,4 +1,83 @@
#include <errno.h>
#include <unistd.h>
#include <pthread.h>
#include "sce.h"
#include "log.h"
#include "utils.h"
static void *worker_thread_cycle(void *arg)
{
struct thread_ctx *thread_ctx = (struct thread_ctx *)arg;
struct packet_io *handle = thread_ctx->ref_io;
int n_packet_recv;
LOG_INFO("%s: worker thread %d running", LOG_TAG_SCE, thread_ctx->thread_index);
while (1)
{
n_packet_recv = packet_io_polling_nf_interface(handle, thread_ctx->thread_index, thread_ctx);
if (n_packet_recv)
{
LOG_INFO("%s: worker thread %d recv %03d packets from nf_interface", LOG_TAG_SCE, thread_ctx->thread_index, n_packet_recv);
}
n_packet_recv = packet_io_polling_endpoint(handle, thread_ctx->thread_index, thread_ctx);
if (n_packet_recv)
{
LOG_INFO("%s: worker thread %d recv %03d packets from endpoint", LOG_TAG_SCE, thread_ctx->thread_index, n_packet_recv);
}
// TODO reset session_table
}
LOG_ERROR("%s: worker thread %d exiting", LOG_TAG_SCE, thread_ctx->thread_index);
return (void *)NULL;
}
int main(int argc, char **argv)
{
const char *profile = "./conf/sce.conf";
struct sce_ctx *ctx = sce_ctx_create(profile);
if (ctx == NULL)
{
return -1;
}
for (int i = 0; i < ctx->nr_worker_threads; i++)
{
ctx->work_threads[i].tid = 0;
ctx->work_threads[i].thread_index = i;
ctx->work_threads[i].session_table = session_table_create();
ctx->work_threads[i].ref_io = ctx->io;
ctx->work_threads[i].ref_metrics = ctx->metrics;
ctx->work_threads[i].ref_enforcer = ctx->enforcer;
}
for (int i = 0; i < ctx->nr_worker_threads; i++)
{
struct thread_ctx *thread_ctx = &ctx->work_threads[i];
if (pthread_create(&thread_ctx->tid, NULL, worker_thread_cycle, (void *)thread_ctx) < 0)
{
LOG_ERROR("%s: unable to create worker thread %d, error %d: %s", LOG_TAG_SCE, i, errno, strerror(errno));
goto error_out;
}
}
while (1)
{
sleep(20);
global_metrics_dump(ctx->metrics);
}
error_out:
for (int i = 0; i < ctx->nr_worker_threads; i++)
{
struct thread_ctx *thread_ctx = &ctx->work_threads[i];
session_table_destory(thread_ctx->session_table);
}
sce_ctx_destory(ctx);
return 0;
}
}

1203
platform/src/packet_io.cpp Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1103,6 +1103,22 @@ static enum session_action select_sf_by_ldbc(uint64_t hash, struct sff_param *sf
return SESSION_ACTION_BYPASS;
}
static void selected_sf_init(struct selected_sf *item)
{
if (item)
{
item->policy_id = -1;
item->traffic_type = TRAFFIC_TYPE_NONE;
item->sff_profile_id = -1;
item->sff_forward_type = FORWARD_TYPE_NONE;
item->sf_need_skip = 0;
item->sf_profile_id = -1;
item->sf_action = SESSION_ACTION_BYPASS;
item->sf_action_reason = ACTION_BYPASS_DUE_DEFAULT;
memset(&item->sf_connectivity, 0, sizeof(struct connectivity));
}
}
/******************************************************************************
* Public API
******************************************************************************/
@@ -1298,21 +1314,15 @@ struct selected_chaining *selected_chaining_create(int chaining_size)
{
struct selected_chaining *chaining = (struct selected_chaining *)calloc(1, sizeof(struct selected_chaining));
assert(chaining);
chaining->policy_id = -1;
chaining->traffic_type = TRAFFIC_TYPE_NONE;
chaining->chaining_index = 0;
chaining->chaining_used = 0;
chaining->chaining_size = chaining_size;
chaining->chaining = (struct selected_sf *)calloc(chaining->chaining_size, sizeof(struct selected_sf));
assert(chaining->chaining);
for (int i = 0; i < chaining->chaining_size; i++)
{
struct selected_sf *elem = &(chaining->chaining[i]);
elem->sff_profile_id = -1;
elem->sff_forward_type = FORWARD_TYPE_NONE;
elem->sf_profile_id = -1;
elem->sf_action = SESSION_ACTION_BYPASS;
elem->sf_action_reason = ACTION_BYPASS_DUE_DEFAULT;
struct selected_sf *item = &(chaining->chaining[i]);
selected_sf_init(item);
}
return chaining;
@@ -1340,18 +1350,20 @@ void selected_chaining_dump(struct selected_chaining *chaining)
return;
}
LOG_DEBUG("%s: selected_chaining->policy_id : %d", LOG_TAG_POLICY, chaining->policy_id);
LOG_DEBUG("%s: selected_chaining->traffic_type : %s", LOG_TAG_POLICY, traffic_type_to_string(chaining->traffic_type));
LOG_DEBUG("%s: selected_chaining->chaining_size : %d", LOG_TAG_POLICY, chaining->chaining_size);
LOG_DEBUG("%s: selected_chaining->chaining_used : %d", LOG_TAG_POLICY, chaining->chaining_used);
for (int i = 0; i < chaining->chaining_size; i++)
for (int i = 0; i < chaining->chaining_used; i++)
{
struct selected_sf *node = &(chaining->chaining[i]);
LOG_DEBUG("%s: selected_chaining->node[%d]->policy_id : %d", LOG_TAG_POLICY, i, node->policy_id);
LOG_DEBUG("%s: selected_chaining->node[%d]->traffic_type : %s", LOG_TAG_POLICY, i, traffic_type_to_string(node->traffic_type));
// sff
LOG_DEBUG("%s: selected_chaining->node[%d]->sff_profile_id : %d", LOG_TAG_POLICY, i, node->sff_profile_id);
LOG_DEBUG("%s: selected_chaining->node[%d]->sff_forward_type : %s", LOG_TAG_POLICY, i, forward_type_to_string(node->sff_forward_type));
// sf
LOG_DEBUG("%s: selected_chaining->node[%d]->sf_profile_id : %d", LOG_TAG_POLICY, i, node->sf_profile_id);
LOG_DEBUG("%s: selected_chaining->node[%d]->sf_need_skip : %d", LOG_TAG_POLICY, i, node->sf_need_skip);
LOG_DEBUG("%s: selected_chaining->node[%d]->sf_action : %s", LOG_TAG_POLICY, i, session_action_to_string(node->sf_action));
LOG_DEBUG("%s: selected_chaining->node[%d]->sf_action_reason : %s", LOG_TAG_POLICY, i, session_action_reason_to_string(node->sf_action_reason));
LOG_DEBUG("%s: selected_chaining->node[%d]->sf_connectivity->package_method : %s", LOG_TAG_POLICY, i, package_method_to_string(node->sf_connectivity.method));
@@ -1371,48 +1383,43 @@ void selected_chaining_bref(struct selected_chaining *chaining)
char buff[4096] = {0};
int buff_used = 0;
int buff_size = sizeof(buff);
buff_used += snprintf(buff + buff_used, buff_size - buff_used, "policy_id:%d, chaining_size:%d, ", chaining->policy_id, chaining->chaining_size);
for (int i = 0; i < chaining->chaining_size; i++)
buff_used += snprintf(buff + buff_used, buff_size - buff_used, "chaining_size:%d, chaining_used:%d, {", chaining->chaining_size, chaining->chaining_used);
for (int i = 0; i < chaining->chaining_used; i++)
{
struct selected_sf *node = &(chaining->chaining[i]);
if (buff_size - buff_used > 0)
{
buff_used += snprintf(buff + buff_used, buff_size - buff_used, "node[%d]={%s:%d:%d} ", i, session_action_reason_to_string(node->sf_action_reason), node->sff_profile_id, node->sf_profile_id);
buff_used += snprintf(buff + buff_used, buff_size - buff_used, "\"node[%d]\":{\"skip\":%d,\"reason\":\"%s\",\"policy_id\":%d,\"sff_profile_id\":%d,\"sf_profile_id\":%d}, ", i, node->sf_need_skip, session_action_reason_to_string(node->sf_action_reason), node->policy_id, node->sff_profile_id, node->sf_profile_id);
}
}
LOG_DEBUG("%s: selected_chaining_bref: %s", LOG_TAG_POLICY, buff);
LOG_DEBUG("%s: selected_chaining_bref: %s}", LOG_TAG_POLICY, buff);
}
// return NULL: NEED BYPASS ALL SFF
// return !NULL:
struct selected_chaining *policy_enforce_select_chaining(struct policy_enforcer *enforcer, struct raw_pkt_parser *parser, int policy_id, int dir_is_internal)
void policy_enforce_select_chaining(struct selected_chaining *chaining, struct policy_enforcer *enforcer, struct raw_pkt_parser *parser, int policy_id, int dir_is_internal)
{
uint64_t hash_value = 0;
char buffer[16] = {0};
struct sf_param *sf_param = NULL;
struct sff_param *sff_param = NULL;
struct fixed_num_array array = {0};
struct selected_chaining *chaining = NULL;
struct chaining_param *chaining_param = NULL;
snprintf(buffer, sizeof(buffer), "%d", policy_id);
chaining_param = (struct chaining_param *)Maat_plugin_get_EX_data(enforcer->maat, enforcer->compile_table_id, buffer);
if (chaining_param == NULL)
{
LOG_ERROR("%s: failed to get chaining parameter of policy %d, bypass !!!", LOG_TAG_POLICY, policy_id);
// BYPASS ALL SFF
return NULL;
LOG_ERROR("%s: failed to get chaining parameter of policy %d", LOG_TAG_POLICY, policy_id);
return;
}
LOG_DEBUG("%s: enforce chaining policy %d", LOG_TAG_POLICY, policy_id);
chaining = selected_chaining_create(chaining_param->sff_profile_ids_num);
assert(chaining);
chaining->policy_id = policy_id;
chaining->traffic_type = chaining_param->traffic_type;
for (int i = 0; i < chaining_param->sff_profile_ids_num; i++)
for (int i = 0; i < chaining_param->sff_profile_ids_num && chaining->chaining_used < chaining->chaining_size; i++)
{
struct selected_sf *item = &(chaining->chaining[chaining->chaining_index]);
struct selected_sf *item = &(chaining->chaining[chaining->chaining_used]);
selected_sf_init(item);
item->policy_id = policy_id;
item->traffic_type = chaining_param->traffic_type;
item->sff_profile_id = chaining_param->sff_profile_ids[i];
memset(buffer, 0, sizeof(buffer));
@@ -1423,7 +1430,7 @@ struct selected_chaining *policy_enforce_select_chaining(struct policy_enforcer
LOG_ERROR("%s: failed to get sff parameter of profile %d, bypass current sff !!!", LOG_TAG_POLICY, item->sff_profile_id);
item->sf_action = SESSION_ACTION_BYPASS;
item->sf_action_reason = ACTION_BYPASS_DUE_INVALID_POLICY;
chaining->chaining_index++;
chaining->chaining_used++;
continue;
}
item->sff_forward_type = sff_param->sff_forward_type;
@@ -1436,7 +1443,7 @@ struct selected_chaining *policy_enforce_select_chaining(struct policy_enforcer
LOG_DEBUG("%s: chaining policy %d -> sff_profile %d, no sf available after filtering by 'nearby & active', bypass current sff !!!", LOG_TAG_POLICY, policy_id, item->sff_profile_id);
item->sf_action = SESSION_ACTION_BYPASS;
item->sf_action_reason = ACTION_BYPASS_DUE_NO_AVAILABLE_SF;
chaining->chaining_index++;
chaining->chaining_used++;
sff_param_free(sff_param);
continue;
}
@@ -1445,7 +1452,7 @@ struct selected_chaining *policy_enforce_select_chaining(struct policy_enforcer
item->sf_action = select_sf_by_ldbc(hash_value, sff_param, &array, &(item->sf_profile_id), &(item->sf_action_reason));
if (item->sf_action != SESSION_ACTION_FORWARD)
{
chaining->chaining_index++;
chaining->chaining_used++;
sff_param_free(sff_param);
continue;
}
@@ -1458,7 +1465,7 @@ struct selected_chaining *policy_enforce_select_chaining(struct policy_enforcer
LOG_ERROR("%s: failed to get sf parameter of selected profile %d, bypass current sff !!!", LOG_TAG_POLICY, item->sf_profile_id);
item->sf_action = SESSION_ACTION_BYPASS;
item->sf_action_reason = ACTION_BYPASS_DUE_INVALID_POLICY;
chaining->chaining_index++;
chaining->chaining_used++;
sff_param_free(sff_param);
continue;
}
@@ -1467,13 +1474,27 @@ struct selected_chaining *policy_enforce_select_chaining(struct policy_enforcer
item->sf_connectivity.int_vlan_tag = sf_param->sf_connectivity.int_vlan_tag;
item->sf_connectivity.ext_vlan_tag = sf_param->sf_connectivity.ext_vlan_tag;
memcpy(item->sf_connectivity.dest_ip, sf_param->sf_connectivity.dest_ip, strlen(sf_param->sf_connectivity.dest_ip));
chaining->chaining_index++;
chaining->chaining_used++;
sf_param_free(sf_param);
sff_param_free(sff_param);
}
chaining_param_free(chaining_param);
// Selected Service Chaining Before Unique : [1,2,3,1,2]
// Selected Service Chaining After Unique : [1,2,3]
for (int i = 0; i < chaining->chaining_used; i++)
{
struct selected_sf *node_i = &(chaining->chaining[i]);
for (int j = 0; j < i; j++)
{
struct selected_sf *node_j = &(chaining->chaining[j]);
if (node_i->sf_profile_id == node_j->sf_profile_id)
{
node_i->sf_need_skip = 1;
break;
}
}
}
return chaining;
chaining_param_free(chaining_param);
}

133
platform/src/sce.cpp Normal file
View File

@@ -0,0 +1,133 @@
#include <assert.h>
#include <MESA/MESA_prof_load.h>
#include "sce.h"
#include "log.h"
/******************************************************************************
* global_metrics
******************************************************************************/
struct global_metrics *global_metrics_create()
{
struct global_metrics *metrics = (struct global_metrics *)calloc(1, sizeof(struct global_metrics));
assert(metrics == NULL);
return metrics;
}
void global_metrics_destory(struct global_metrics *metrics)
{
if (metrics)
{
free(metrics);
metrics = NULL;
}
}
void global_metrics_dump(struct global_metrics *metrics)
{
if (metrics)
{
LOG_INFO("%s: dev_endpoint_rx : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_endpoint_rx.n_pkts, metrics->dev_endpoint_rx.n_bytes);
LOG_INFO("%s: dev_endpoint_tx : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_endpoint_tx.n_pkts, metrics->dev_endpoint_tx.n_bytes);
LOG_INFO("%s: dev_endpoint_err_drop : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_endpoint_err_drop.n_pkts, metrics->dev_endpoint_err_drop.n_bytes);
LOG_INFO("%s: dev_nf_interface_rx : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_nf_interface_rx.n_pkts, metrics->dev_nf_interface_rx.n_bytes);
LOG_INFO("%s: dev_nf_interface_tx : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_nf_interface_tx.n_pkts, metrics->dev_nf_interface_tx.n_bytes);
LOG_INFO("%s: dev_nf_interface_err_bypass : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->dev_nf_interface_err_bypass.n_pkts, metrics->dev_nf_interface_err_bypass.n_bytes);
LOG_INFO("%s: hit_block_policy : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->hit_block_policy.n_pkts, metrics->hit_block_policy.n_bytes);
LOG_INFO("%s: hit_bypass_policy : n_pkts : %6lu, n_bytes: %6lu", LOG_TAG_METRICS, metrics->hit_bypass_policy.n_pkts, metrics->hit_bypass_policy.n_bytes);
LOG_INFO("%s: current_session_num : %6lu", LOG_TAG_METRICS, metrics->session_nums);
}
}
/******************************************************************************
* session_ctx
******************************************************************************/
struct session_ctx *session_ctx_new()
{
struct session_ctx *ctx = (struct session_ctx *)calloc(1, sizeof(struct session_ctx));
assert(ctx != NULL);
return ctx;
}
void session_ctx_free(struct session_ctx *ctx)
{
if (ctx)
{
if (ctx->first_ctrl_pkt.addr_string)
{
free(ctx->first_ctrl_pkt.addr_string);
ctx->first_ctrl_pkt.addr_string = NULL;
}
if (ctx->first_ctrl_pkt.header_data)
{
free(ctx->first_ctrl_pkt.header_data);
ctx->first_ctrl_pkt.header_data = NULL;
}
if (ctx->chaining)
{
selected_chaining_destory(ctx->chaining);
ctx->chaining = NULL;
}
free(ctx);
ctx = 0;
}
}
/******************************************************************************
* sce_ctx
******************************************************************************/
struct sce_ctx *sce_ctx_create(const char *profile)
{
struct sce_ctx *ctx = (struct sce_ctx *)calloc(1, sizeof(struct sce_ctx));
MESA_load_profile_int_def(profile, "system", "firewall_sids", (int *)&(ctx->firewall_sids), 1001);
MESA_load_profile_int_def(profile, "system", "nr_worker_threads", (int *)&(ctx->nr_worker_threads), 8);
ctx->nr_worker_threads = MIN(ctx->nr_worker_threads, (int)(sizeof(ctx->work_threads) / sizeof(ctx->work_threads[0])));
ctx->io = packet_io_create(profile, ctx->nr_worker_threads);
if (ctx->io == NULL)
{
goto error_out;
}
ctx->metrics = global_metrics_create();
if (ctx->metrics == NULL)
{
goto error_out;
}
ctx->enforcer = policy_enforcer_create("SCE", profile, ctx->nr_worker_threads, NULL);
if (ctx->enforcer == NULL)
{
goto error_out;
}
return ctx;
error_out:
sce_ctx_destory(ctx);
return NULL;
}
void sce_ctx_destory(struct sce_ctx *ctx)
{
if (ctx)
{
policy_enforcer_destory(ctx->enforcer);
global_metrics_destory(ctx->metrics);
packet_io_destory(ctx->io);
free(ctx);
ctx = NULL;
}
}