diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index 6b1e060..b9ba9dd 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(common src/addr_tuple4.cpp src/session_table.cpp src/raw_packet.cpp src/ctrl_packet.cpp src/bfd.cpp src/utils.cpp src/g_vxlan.cpp src/log.cpp) +add_library(common src/addr_tuple4.cpp src/session_table.cpp src/raw_packet.cpp src/ctrl_packet.cpp src/bfd.cpp src/utils.cpp src/g_vxlan.cpp src/log.cpp src/timestamp.cpp) target_link_libraries(common PUBLIC cjson) target_link_libraries(common PUBLIC mrzcpd) target_link_libraries(common PUBLIC MESA_handle_logger) diff --git a/common/include/timestamp.h b/common/include/timestamp.h new file mode 100644 index 0000000..b3251f6 --- /dev/null +++ b/common/include/timestamp.h @@ -0,0 +1,24 @@ +#ifndef _TIMESTAMP_H +#define _TIMESTAMP_H + +#ifdef __cpluscplus +extern "C" +{ +#endif + +#include "utils.h" + +struct timestamp *timestamp_new(uint64_t update_interval_ms); +void timestamp_free(struct timestamp *ts); + +void timestamp_update(struct timestamp *ts); +uint64_t timestamp_update_interval_ms(struct timestamp *ts); + +uint64_t timestamp_get_sec(struct timestamp *ts); +uint64_t timestamp_get_msec(struct timestamp *ts); + +#ifdef __cpluscplus +} +#endif + +#endif diff --git a/common/include/utils.h b/common/include/utils.h index 99f5f47..f5de124 100644 --- a/common/include/utils.h +++ b/common/include/utils.h @@ -18,6 +18,14 @@ extern "C" #define LOG_TAG_SF_METRICS "SF_METRICS" #define LOG_TAG_SF_STATUS "SF_STATUS" #define LOG_TAG_SCE "SCE" +#define LOG_TAG_TIMESTAMP "TIMESTAMP" + +#define ATOMIC_INC(x) __atomic_fetch_add(x, 1, __ATOMIC_RELAXED) +#define ATOMIC_DEC(x) __atomic_fetch_sub(x, 1, __ATOMIC_RELAXED) +#define ATOMIC_READ(x) __atomic_fetch_add(x, 0, __ATOMIC_RELAXED) +#define ATOMIC_ZERO(x) __atomic_fetch_and(x, 0, __ATOMIC_RELAXED) +#define ATOMIC_ADD(x, y) __atomic_fetch_add(x, y, __ATOMIC_RELAXED) +#define ATOMIC_SET(x, y) __atomic_store_n(x, y, __ATOMIC_RELAXED) /****************************************************************************** * fixed_num_array diff --git a/common/src/timestamp.cpp b/common/src/timestamp.cpp new file mode 100644 index 0000000..6dc0cbb --- /dev/null +++ b/common/src/timestamp.cpp @@ -0,0 +1,65 @@ +#include +#include + +#include "log.h" +#include "timestamp.h" + +// 1 s = 1000 ms +// 1 ms = 1000 us +// 1 us = 1000 ns + +struct timestamp +{ + struct timespec timestamp; + uint64_t update_interval_ms; +}; + +struct timestamp *timestamp_new(uint64_t update_interval_ms) +{ + struct timestamp *ts = (struct timestamp *)calloc(1, sizeof(struct timestamp)); + ts->update_interval_ms = update_interval_ms; + + timestamp_update(ts); + LOG_DEBUG("%s: TIMESTAMP->update_interval_ms : %lu", LOG_TAG_TIMESTAMP, timestamp_update_interval_ms(ts)); + LOG_DEBUG("%s: TIMESTAMP->current_sec : %lu", LOG_TAG_TIMESTAMP, timestamp_get_sec(ts)); + LOG_DEBUG("%s: TIMESTAMP->current_msec : %lu", LOG_TAG_TIMESTAMP, timestamp_get_msec(ts)); + + return ts; +} + +void timestamp_free(struct timestamp *ts) +{ + if (ts) + { + free(ts); + ts = NULL; + } +} + +void timestamp_update(struct timestamp *ts) +{ + struct timespec temp; + clock_gettime(CLOCK_MONOTONIC, &temp); + ATOMIC_SET(&(ts->timestamp.tv_sec), temp.tv_sec); + ATOMIC_SET(&(ts->timestamp.tv_nsec), temp.tv_nsec); +} + +uint64_t timestamp_update_interval_ms(struct timestamp *ts) +{ + return ts->update_interval_ms; +} + +uint64_t timestamp_get_sec(struct timestamp *ts) +{ + uint64_t sec = ATOMIC_READ(&(ts->timestamp.tv_sec)); + + return sec; +} + +uint64_t timestamp_get_msec(struct timestamp *ts) +{ + uint64_t sec = ATOMIC_READ(&(ts->timestamp.tv_sec)); + uint64_t nsec = ATOMIC_READ(&(ts->timestamp.tv_nsec)); + + return sec * 1000 + nsec / 1000000; +} diff --git a/conf/sce.conf b/conf/sce.conf index 70ac842..f47108b 100644 --- a/conf/sce.conf +++ b/conf/sce.conf @@ -1,9 +1,9 @@ [system] nr_worker_threads=8 -enable_cpu_affinity=1 cpu_affinity_mask=2,3,4-9 firewall_sids=1001 enable_debug=0 +ts_update_interval_ms=1 # Only when (disable_coredump == 1 || (enable_breakpad == 1 && enable_breakpad_upload == 1)) is satisfied, the core will not be generated locally disable_coredump=0 diff --git a/platform/include/packet_io.h b/platform/include/packet_io.h index 4c5b34b..7ab522b 100644 --- a/platform/include/packet_io.h +++ b/platform/include/packet_io.h @@ -6,9 +6,12 @@ extern "C" { #endif -struct packet_io *packet_io_create(const char *profile, int thread_num); +struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_t *coremask); void packet_io_destory(struct packet_io *handle); +int packet_io_thread_init(struct packet_io *handle, struct thread_ctx *thread_ctx); +void packet_io_thread_wait(struct packet_io *handle, struct thread_ctx *thread_ctx, int timeout_ms); + int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx); int packet_io_polling_endpoint(struct packet_io *handle, int thread_seq, void *ctx); diff --git a/platform/include/sce.h b/platform/include/sce.h index 49d47d8..2923a41 100644 --- a/platform/include/sce.h +++ b/platform/include/sce.h @@ -6,91 +6,94 @@ extern "C" { #endif +#include + #include "policy.h" +#include "timestamp.h" #include "packet_io.h" #include "session_table.h" #define MAX_THREAD_NUM 128 -/****************************************************************************** - * Struct For Thread - ******************************************************************************/ + /****************************************************************************** + * Struct For Thread + ******************************************************************************/ -struct thread_ctx -{ - pthread_t tid; - int thread_index; - struct session_table *session_table; - struct sf_metrics *sf_metrics; + struct thread_ctx + { + pthread_t tid; + int thread_index; + struct session_table *session_table; + struct sf_metrics *sf_metrics; - struct packet_io *ref_io; - struct global_metrics *ref_metrics; - struct policy_enforcer *ref_enforcer; - struct sce_ctx *ref_sce_ctx; + struct packet_io *ref_io; + struct global_metrics *ref_metrics; + struct policy_enforcer *ref_enforcer; + struct sce_ctx *ref_sce_ctx; - int session_table_need_reset; - int sf_metrics_need_send; - int cpu_mask; -}; + int session_table_need_reset; + }; -/****************************************************************************** - * Struct For Session - ******************************************************************************/ + /****************************************************************************** + * Struct For Session + ******************************************************************************/ -struct packet_info -{ - int dir_is_e2i; - struct addr_tuple4 tuple4; - char *addr_string; + struct packet_info + { + int dir_is_e2i; + struct addr_tuple4 tuple4; + char *addr_string; - char *header_data; - int header_len; + char *header_data; + int header_len; - struct sids sids; - struct route_ctx route_ctx; -}; + struct sids sids; + struct route_ctx route_ctx; + }; -struct session_ctx -{ - struct fixed_num_array policy_ids; - uint64_t session_id; + struct session_ctx + { + struct fixed_num_array policy_ids; + uint64_t session_id; - struct route_ctx raw_pkt_i2e_route_ctx; - struct route_ctx raw_pkt_e2i_route_ctx; + struct route_ctx raw_pkt_i2e_route_ctx; + struct route_ctx raw_pkt_e2i_route_ctx; - struct sids raw_pkt_i2e_sids; - struct sids raw_pkt_e2i_sids; + struct sids raw_pkt_i2e_sids; + struct sids raw_pkt_e2i_sids; - // depending on first control packet - struct packet_info first_ctrl_pkt; - struct selected_chaining *chaining; + // depending on first control packet + struct packet_info first_ctrl_pkt; + struct selected_chaining *chaining; - struct thread_ctx *ref_thread_ctx; -}; + struct thread_ctx *ref_thread_ctx; + }; -struct session_ctx *session_ctx_new(); -void session_ctx_free(struct session_ctx *ctx); + struct session_ctx *session_ctx_new(); + void session_ctx_free(struct session_ctx *ctx); -/****************************************************************************** - * Struct For SCE - ******************************************************************************/ + /****************************************************************************** + * Struct For SCE + ******************************************************************************/ -struct sce_ctx -{ - int enable_debug; - int firewall_sids; - int nr_worker_threads; - int enable_cpu_affinity; - int cpu_affinity_mask[MAX_THREAD_NUM]; + struct sce_ctx + { + int enable_debug; + int firewall_sids; + int nr_worker_threads; + int ts_update_interval_ms; + int cpu_affinity_mask[MAX_THREAD_NUM]; - struct packet_io *io; - struct global_metrics *metrics; - struct policy_enforcer *enforcer; - struct thread_ctx work_threads[MAX_THREAD_NUM]; -}; + cpu_set_t coremask; + struct timestamp *ts; + struct packet_io *io; + struct global_metrics *metrics; + struct policy_enforcer *enforcer; + struct thread_ctx work_threads[MAX_THREAD_NUM]; + }; -struct sce_ctx *sce_ctx_create(const char *profile); -void sce_ctx_destory(struct sce_ctx *ctx); + struct sce_ctx *sce_ctx_create(const char *profile); + void sce_ctx_destory(struct sce_ctx *ctx); #ifdef __cpluscplus } diff --git a/platform/src/main.cpp b/platform/src/main.cpp index 21af7f4..364955d 100644 --- a/platform/src/main.cpp +++ b/platform/src/main.cpp @@ -56,33 +56,39 @@ static void *worker_thread_cycle(void *arg) { struct thread_ctx *thread_ctx = (struct thread_ctx *)arg; struct packet_io *handle = thread_ctx->ref_io; - int n_packet_recv; + struct sce_ctx *sce_ctx = thread_ctx->ref_sce_ctx; + struct timestamp *ts = sce_ctx->ts; + int timeout_ms = 0; + int n_pkt_recv_from_nf = 0; + int n_pkt_recv_from_endp = 0; char thread_name[16]; + uint64_t sf_metrics_last_send_ts = timestamp_get_msec(ts); + uint64_t sf_metrics_send_interval = sf_metrics_get_interval(thread_ctx->sf_metrics) * 1000; + snprintf(thread_name, sizeof(thread_name), "sce:worker-%d", thread_ctx->thread_index); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); - char affinity[32] = {0}; - if (thread_ctx->cpu_mask >= 0) + if (packet_io_thread_init(handle, thread_ctx) != 0) { - thread_set_affinity(thread_ctx->cpu_mask); - snprintf(affinity, sizeof(affinity), "affinity cpu%d", thread_ctx->cpu_mask); + goto error_out; } - LOG_INFO("%s: worker thread %d %s is running", LOG_TAG_SCE, thread_ctx->thread_index, thread_ctx->cpu_mask >= 0 ? affinity : ""); + LOG_INFO("%s: worker thread %d is running", LOG_TAG_SCE, thread_ctx->thread_index); while (1) { - n_packet_recv = packet_io_polling_nf_interface(handle, thread_ctx->thread_index, thread_ctx); - if (n_packet_recv) + n_pkt_recv_from_nf = packet_io_polling_nf_interface(handle, thread_ctx->thread_index, thread_ctx); + n_pkt_recv_from_endp = packet_io_polling_endpoint(handle, thread_ctx->thread_index, thread_ctx); + if (n_pkt_recv_from_nf == 0 && n_pkt_recv_from_endp == 0) { - // LOG_INFO("%s: worker thread %d recv %03d packets from nf_interface", LOG_TAG_SCE, thread_ctx->thread_index, n_packet_recv); - } + timeout_ms = sf_metrics_last_send_ts + sf_metrics_send_interval - timestamp_get_msec(ts); + if (timeout_ms <= 0) + { + timeout_ms = 0; + } - n_packet_recv = packet_io_polling_endpoint(handle, thread_ctx->thread_index, thread_ctx); - if (n_packet_recv) - { - // LOG_INFO("%s: worker thread %d recv %03d packets from endpoint", LOG_TAG_SCE, thread_ctx->thread_index, n_packet_recv); + packet_io_thread_wait(handle, thread_ctx, timeout_ms); } if (__atomic_fetch_add(&thread_ctx->session_table_need_reset, 0, __ATOMIC_RELAXED) > 0) @@ -91,14 +97,15 @@ static void *worker_thread_cycle(void *arg) __atomic_fetch_and(&thread_ctx->session_table_need_reset, 0, __ATOMIC_RELAXED); } - if (__atomic_fetch_add(&thread_ctx->sf_metrics_need_send, 0, __ATOMIC_RELAXED) > 0) + if (timestamp_get_msec(ts) - sf_metrics_last_send_ts >= sf_metrics_send_interval) { sf_metrics_send(thread_ctx->sf_metrics); sf_metrics_reset(thread_ctx->sf_metrics); - __atomic_fetch_and(&thread_ctx->sf_metrics_need_send, 0, __ATOMIC_RELAXED); + sf_metrics_last_send_ts = timestamp_get_msec(ts); } } +error_out: LOG_ERROR("%s: worker thread %d exiting", LOG_TAG_SCE, thread_ctx->thread_index); return (void *)NULL; } @@ -106,6 +113,9 @@ static void *worker_thread_cycle(void *arg) int main(int argc, char **argv) { const char *profile = "./conf/sce.conf"; + uint64_t ts_update_interval = 0; + uint64_t g_metrics_last_send_ts = 0; + uint64_t g_metrics_send_interval = 0; int opt = 0; while ((opt = getopt(argc, argv, "vh")) != -1) @@ -157,8 +167,6 @@ int main(int argc, char **argv) ctx->work_threads[i].ref_enforcer = ctx->enforcer; ctx->work_threads[i].ref_sce_ctx = ctx; ctx->work_threads[i].session_table_need_reset = 0; - ctx->work_threads[i].sf_metrics_need_send = 0; - ctx->work_threads[i].cpu_mask = ctx->enable_cpu_affinity ? ctx->cpu_affinity_mask[i] : -1; } for (int i = 0; i < ctx->nr_worker_threads; i++) @@ -171,34 +179,21 @@ int main(int argc, char **argv) } } - struct timespec current_time; - struct timespec g_metrics_last_send_time; - struct timespec sf_metrics_last_send_time; - - clock_gettime(CLOCK_MONOTONIC, ¤t_time); - clock_gettime(CLOCK_MONOTONIC, &g_metrics_last_send_time); - clock_gettime(CLOCK_MONOTONIC, &sf_metrics_last_send_time); + timestamp_update(ctx->ts); + ts_update_interval = timestamp_update_interval_ms(ctx->ts); + g_metrics_last_send_ts = timestamp_get_msec(ctx->ts); + g_metrics_send_interval = ctx->metrics->config.statsd_cycle * 1000; while (1) { - if (current_time.tv_sec - g_metrics_last_send_time.tv_sec >= ctx->metrics->config.statsd_cycle) + if (timestamp_get_msec(ctx->ts) - g_metrics_last_send_ts >= g_metrics_send_interval) { - clock_gettime(CLOCK_MONOTONIC, &g_metrics_last_send_time); global_metrics_dump(ctx->metrics); + g_metrics_last_send_ts = timestamp_get_msec(ctx->ts); } - if (current_time.tv_sec - sf_metrics_last_send_time.tv_sec >= sf_metrics_get_interval(ctx->work_threads[0].sf_metrics)) - { - clock_gettime(CLOCK_MONOTONIC, &sf_metrics_last_send_time); - for (int i = 0; i < ctx->nr_worker_threads; i++) - { - struct thread_ctx *thread_ctx = &ctx->work_threads[i]; - __atomic_fetch_add(&thread_ctx->sf_metrics_need_send, 1, __ATOMIC_RELAXED); - } - } - - sleep(MIN(ctx->metrics->config.statsd_cycle, sf_metrics_get_interval(ctx->work_threads[0].sf_metrics))); - clock_gettime(CLOCK_MONOTONIC, ¤t_time); + usleep(ts_update_interval * 1000); + timestamp_update(ctx->ts); } error_out: diff --git a/platform/src/packet_io.cpp b/platform/src/packet_io.cpp index b344b38..a4900ae 100644 --- a/platform/src/packet_io.cpp +++ b/platform/src/packet_io.cpp @@ -103,7 +103,7 @@ struct metadata * API Declaration ******************************************************************************/ -struct packet_io *packet_io_create(const char *profile, int thread_num); +struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_t *coremask); void packet_io_destory(struct packet_io *handle); int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx); @@ -176,7 +176,7 @@ static int is_upstream_keepalive_packet(marsio_buff_t *rx_buff); * API Definition ******************************************************************************/ -struct packet_io *packet_io_create(const char *profile, int thread_num) +struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_t *coremask) { int opt = 1; struct packet_io *handle = (struct packet_io *)calloc(1, sizeof(struct packet_io)); @@ -195,6 +195,12 @@ struct packet_io *packet_io_create(const char *profile, int thread_num) goto error_out; } + if (marsio_option_set(handle->instance, MARSIO_OPT_THREAD_MASK_IN_CPUSET, coremask, sizeof(cpu_set_t)) != 0) + { + LOG_ERROR("%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_TAG_PKTIO); + goto error_out; + } + if (marsio_option_set(handle->instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0) { LOG_ERROR("%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_TAG_PKTIO); @@ -289,6 +295,26 @@ void packet_io_destory(struct packet_io *handle) } } +int packet_io_thread_init(struct packet_io *handle, struct thread_ctx *thread_ctx) +{ + if (marsio_thread_init(handle->instance) != 0) + { + LOG_ERROR("%s: unable to init marsio thread %d", LOG_TAG_PKTIO, thread_ctx->thread_index); + return -1; + } + + return 0; +} + +void packet_io_thread_wait(struct packet_io *handle, struct thread_ctx *thread_ctx, int timeout_ms) +{ + struct mr_vdev *vdevs[] = { + handle->dev_nf_interface.mr_dev, + handle->dev_endpoint.mr_dev}; + + marsio_poll_wait(handle->instance, vdevs, 2, thread_ctx->thread_index, timeout_ms); +} + // return n_packet_recv int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx) { diff --git a/platform/src/sce.cpp b/platform/src/sce.cpp index 760c745..1974cc5 100644 --- a/platform/src/sce.cpp +++ b/platform/src/sce.cpp @@ -54,11 +54,20 @@ struct sce_ctx *sce_ctx_create(const char *profile) MESA_load_profile_int_def(profile, "system", "enable_debug", (int *)&(ctx->enable_debug), 0); MESA_load_profile_int_def(profile, "system", "firewall_sids", (int *)&(ctx->firewall_sids), 1001); MESA_load_profile_int_def(profile, "system", "nr_worker_threads", (int *)&(ctx->nr_worker_threads), 8); - MESA_load_profile_int_def(profile, "system", "enable_cpu_affinity", (int *)&ctx->enable_cpu_affinity, 0); MESA_load_profile_uint_range(profile, "system", "cpu_affinity_mask", MAX_THREAD_NUM, (unsigned int *)ctx->cpu_affinity_mask); + MESA_load_profile_int_def(profile, "system", "ts_update_interval_ms", (int *)&(ctx->ts_update_interval_ms), 1); ctx->nr_worker_threads = MIN(ctx->nr_worker_threads, MAX_THREAD_NUM); - ctx->io = packet_io_create(profile, ctx->nr_worker_threads); + CPU_ZERO(&ctx->coremask); + for (int i = 0; i < ctx->nr_worker_threads; i++) + { + int cpu_id = ctx->cpu_affinity_mask[i]; + CPU_SET(cpu_id, &ctx->coremask); + } + + ctx->ts = timestamp_new(ctx->ts_update_interval_ms); + + ctx->io = packet_io_create(profile, ctx->nr_worker_threads, &ctx->coremask); if (ctx->io == NULL) { goto error_out; @@ -95,6 +104,7 @@ void sce_ctx_destory(struct sce_ctx *ctx) policy_enforcer_destory(ctx->enforcer); global_metrics_destory(ctx->metrics); packet_io_destory(ctx->io); + timestamp_free(ctx->ts); free(ctx); ctx = NULL;