TSG-14342 tsg-service-chaining-engine在空闲时调用marsio_poll_wait让出cpu供其他进程使用

This commit is contained in:
luwenpeng
2023-03-27 14:37:18 +08:00
parent e481abeb02
commit 66d6a266b4
10 changed files with 242 additions and 108 deletions

View File

@@ -1,4 +1,4 @@
add_library(common src/addr_tuple4.cpp src/session_table.cpp src/raw_packet.cpp src/ctrl_packet.cpp src/bfd.cpp src/utils.cpp src/g_vxlan.cpp src/log.cpp)
add_library(common src/addr_tuple4.cpp src/session_table.cpp src/raw_packet.cpp src/ctrl_packet.cpp src/bfd.cpp src/utils.cpp src/g_vxlan.cpp src/log.cpp src/timestamp.cpp)
target_link_libraries(common PUBLIC cjson)
target_link_libraries(common PUBLIC mrzcpd)
target_link_libraries(common PUBLIC MESA_handle_logger)

View File

@@ -0,0 +1,24 @@
#ifndef _TIMESTAMP_H
#define _TIMESTAMP_H
#ifdef __cpluscplus
extern "C"
{
#endif
#include "utils.h"
struct timestamp *timestamp_new(uint64_t update_interval_ms);
void timestamp_free(struct timestamp *ts);
void timestamp_update(struct timestamp *ts);
uint64_t timestamp_update_interval_ms(struct timestamp *ts);
uint64_t timestamp_get_sec(struct timestamp *ts);
uint64_t timestamp_get_msec(struct timestamp *ts);
#ifdef __cpluscplus
}
#endif
#endif

View File

@@ -18,6 +18,14 @@ extern "C"
#define LOG_TAG_SF_METRICS "SF_METRICS"
#define LOG_TAG_SF_STATUS "SF_STATUS"
#define LOG_TAG_SCE "SCE"
#define LOG_TAG_TIMESTAMP "TIMESTAMP"
#define ATOMIC_INC(x) __atomic_fetch_add(x, 1, __ATOMIC_RELAXED)
#define ATOMIC_DEC(x) __atomic_fetch_sub(x, 1, __ATOMIC_RELAXED)
#define ATOMIC_READ(x) __atomic_fetch_add(x, 0, __ATOMIC_RELAXED)
#define ATOMIC_ZERO(x) __atomic_fetch_and(x, 0, __ATOMIC_RELAXED)
#define ATOMIC_ADD(x, y) __atomic_fetch_add(x, y, __ATOMIC_RELAXED)
#define ATOMIC_SET(x, y) __atomic_store_n(x, y, __ATOMIC_RELAXED)
/******************************************************************************
* fixed_num_array

65
common/src/timestamp.cpp Normal file
View File

@@ -0,0 +1,65 @@
#include <time.h>
#include <stdlib.h>
#include "log.h"
#include "timestamp.h"
// 1 s = 1000 ms
// 1 ms = 1000 us
// 1 us = 1000 ns
struct timestamp
{
struct timespec timestamp;
uint64_t update_interval_ms;
};
struct timestamp *timestamp_new(uint64_t update_interval_ms)
{
struct timestamp *ts = (struct timestamp *)calloc(1, sizeof(struct timestamp));
ts->update_interval_ms = update_interval_ms;
timestamp_update(ts);
LOG_DEBUG("%s: TIMESTAMP->update_interval_ms : %lu", LOG_TAG_TIMESTAMP, timestamp_update_interval_ms(ts));
LOG_DEBUG("%s: TIMESTAMP->current_sec : %lu", LOG_TAG_TIMESTAMP, timestamp_get_sec(ts));
LOG_DEBUG("%s: TIMESTAMP->current_msec : %lu", LOG_TAG_TIMESTAMP, timestamp_get_msec(ts));
return ts;
}
void timestamp_free(struct timestamp *ts)
{
if (ts)
{
free(ts);
ts = NULL;
}
}
void timestamp_update(struct timestamp *ts)
{
struct timespec temp;
clock_gettime(CLOCK_MONOTONIC, &temp);
ATOMIC_SET(&(ts->timestamp.tv_sec), temp.tv_sec);
ATOMIC_SET(&(ts->timestamp.tv_nsec), temp.tv_nsec);
}
uint64_t timestamp_update_interval_ms(struct timestamp *ts)
{
return ts->update_interval_ms;
}
uint64_t timestamp_get_sec(struct timestamp *ts)
{
uint64_t sec = ATOMIC_READ(&(ts->timestamp.tv_sec));
return sec;
}
uint64_t timestamp_get_msec(struct timestamp *ts)
{
uint64_t sec = ATOMIC_READ(&(ts->timestamp.tv_sec));
uint64_t nsec = ATOMIC_READ(&(ts->timestamp.tv_nsec));
return sec * 1000 + nsec / 1000000;
}

View File

@@ -1,9 +1,9 @@
[system]
nr_worker_threads=8
enable_cpu_affinity=1
cpu_affinity_mask=2,3,4-9
firewall_sids=1001
enable_debug=0
ts_update_interval_ms=1
# Only when (disable_coredump == 1 || (enable_breakpad == 1 && enable_breakpad_upload == 1)) is satisfied, the core will not be generated locally
disable_coredump=0

View File

@@ -6,9 +6,12 @@ extern "C"
{
#endif
struct packet_io *packet_io_create(const char *profile, int thread_num);
struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_t *coremask);
void packet_io_destory(struct packet_io *handle);
int packet_io_thread_init(struct packet_io *handle, struct thread_ctx *thread_ctx);
void packet_io_thread_wait(struct packet_io *handle, struct thread_ctx *thread_ctx, int timeout_ms);
int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx);
int packet_io_polling_endpoint(struct packet_io *handle, int thread_seq, void *ctx);

View File

@@ -6,7 +6,10 @@ extern "C"
{
#endif
#include <sched.h>
#include "policy.h"
#include "timestamp.h"
#include "packet_io.h"
#include "session_table.h"
@@ -29,8 +32,6 @@ struct thread_ctx
struct sce_ctx *ref_sce_ctx;
int session_table_need_reset;
int sf_metrics_need_send;
int cpu_mask;
};
/******************************************************************************
@@ -80,9 +81,11 @@ struct sce_ctx
int enable_debug;
int firewall_sids;
int nr_worker_threads;
int enable_cpu_affinity;
int ts_update_interval_ms;
int cpu_affinity_mask[MAX_THREAD_NUM];
cpu_set_t coremask;
struct timestamp *ts;
struct packet_io *io;
struct global_metrics *metrics;
struct policy_enforcer *enforcer;

View File

@@ -56,33 +56,39 @@ static void *worker_thread_cycle(void *arg)
{
struct thread_ctx *thread_ctx = (struct thread_ctx *)arg;
struct packet_io *handle = thread_ctx->ref_io;
int n_packet_recv;
struct sce_ctx *sce_ctx = thread_ctx->ref_sce_ctx;
struct timestamp *ts = sce_ctx->ts;
int timeout_ms = 0;
int n_pkt_recv_from_nf = 0;
int n_pkt_recv_from_endp = 0;
char thread_name[16];
uint64_t sf_metrics_last_send_ts = timestamp_get_msec(ts);
uint64_t sf_metrics_send_interval = sf_metrics_get_interval(thread_ctx->sf_metrics) * 1000;
snprintf(thread_name, sizeof(thread_name), "sce:worker-%d", thread_ctx->thread_index);
prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL);
char affinity[32] = {0};
if (thread_ctx->cpu_mask >= 0)
if (packet_io_thread_init(handle, thread_ctx) != 0)
{
thread_set_affinity(thread_ctx->cpu_mask);
snprintf(affinity, sizeof(affinity), "affinity cpu%d", thread_ctx->cpu_mask);
goto error_out;
}
LOG_INFO("%s: worker thread %d %s is running", LOG_TAG_SCE, thread_ctx->thread_index, thread_ctx->cpu_mask >= 0 ? affinity : "");
LOG_INFO("%s: worker thread %d is running", LOG_TAG_SCE, thread_ctx->thread_index);
while (1)
{
n_packet_recv = packet_io_polling_nf_interface(handle, thread_ctx->thread_index, thread_ctx);
if (n_packet_recv)
n_pkt_recv_from_nf = packet_io_polling_nf_interface(handle, thread_ctx->thread_index, thread_ctx);
n_pkt_recv_from_endp = packet_io_polling_endpoint(handle, thread_ctx->thread_index, thread_ctx);
if (n_pkt_recv_from_nf == 0 && n_pkt_recv_from_endp == 0)
{
// LOG_INFO("%s: worker thread %d recv %03d packets from nf_interface", LOG_TAG_SCE, thread_ctx->thread_index, n_packet_recv);
timeout_ms = sf_metrics_last_send_ts + sf_metrics_send_interval - timestamp_get_msec(ts);
if (timeout_ms <= 0)
{
timeout_ms = 0;
}
n_packet_recv = packet_io_polling_endpoint(handle, thread_ctx->thread_index, thread_ctx);
if (n_packet_recv)
{
// LOG_INFO("%s: worker thread %d recv %03d packets from endpoint", LOG_TAG_SCE, thread_ctx->thread_index, n_packet_recv);
packet_io_thread_wait(handle, thread_ctx, timeout_ms);
}
if (__atomic_fetch_add(&thread_ctx->session_table_need_reset, 0, __ATOMIC_RELAXED) > 0)
@@ -91,14 +97,15 @@ static void *worker_thread_cycle(void *arg)
__atomic_fetch_and(&thread_ctx->session_table_need_reset, 0, __ATOMIC_RELAXED);
}
if (__atomic_fetch_add(&thread_ctx->sf_metrics_need_send, 0, __ATOMIC_RELAXED) > 0)
if (timestamp_get_msec(ts) - sf_metrics_last_send_ts >= sf_metrics_send_interval)
{
sf_metrics_send(thread_ctx->sf_metrics);
sf_metrics_reset(thread_ctx->sf_metrics);
__atomic_fetch_and(&thread_ctx->sf_metrics_need_send, 0, __ATOMIC_RELAXED);
sf_metrics_last_send_ts = timestamp_get_msec(ts);
}
}
error_out:
LOG_ERROR("%s: worker thread %d exiting", LOG_TAG_SCE, thread_ctx->thread_index);
return (void *)NULL;
}
@@ -106,6 +113,9 @@ static void *worker_thread_cycle(void *arg)
int main(int argc, char **argv)
{
const char *profile = "./conf/sce.conf";
uint64_t ts_update_interval = 0;
uint64_t g_metrics_last_send_ts = 0;
uint64_t g_metrics_send_interval = 0;
int opt = 0;
while ((opt = getopt(argc, argv, "vh")) != -1)
@@ -157,8 +167,6 @@ int main(int argc, char **argv)
ctx->work_threads[i].ref_enforcer = ctx->enforcer;
ctx->work_threads[i].ref_sce_ctx = ctx;
ctx->work_threads[i].session_table_need_reset = 0;
ctx->work_threads[i].sf_metrics_need_send = 0;
ctx->work_threads[i].cpu_mask = ctx->enable_cpu_affinity ? ctx->cpu_affinity_mask[i] : -1;
}
for (int i = 0; i < ctx->nr_worker_threads; i++)
@@ -171,34 +179,21 @@ int main(int argc, char **argv)
}
}
struct timespec current_time;
struct timespec g_metrics_last_send_time;
struct timespec sf_metrics_last_send_time;
clock_gettime(CLOCK_MONOTONIC, &current_time);
clock_gettime(CLOCK_MONOTONIC, &g_metrics_last_send_time);
clock_gettime(CLOCK_MONOTONIC, &sf_metrics_last_send_time);
timestamp_update(ctx->ts);
ts_update_interval = timestamp_update_interval_ms(ctx->ts);
g_metrics_last_send_ts = timestamp_get_msec(ctx->ts);
g_metrics_send_interval = ctx->metrics->config.statsd_cycle * 1000;
while (1)
{
if (current_time.tv_sec - g_metrics_last_send_time.tv_sec >= ctx->metrics->config.statsd_cycle)
if (timestamp_get_msec(ctx->ts) - g_metrics_last_send_ts >= g_metrics_send_interval)
{
clock_gettime(CLOCK_MONOTONIC, &g_metrics_last_send_time);
global_metrics_dump(ctx->metrics);
g_metrics_last_send_ts = timestamp_get_msec(ctx->ts);
}
if (current_time.tv_sec - sf_metrics_last_send_time.tv_sec >= sf_metrics_get_interval(ctx->work_threads[0].sf_metrics))
{
clock_gettime(CLOCK_MONOTONIC, &sf_metrics_last_send_time);
for (int i = 0; i < ctx->nr_worker_threads; i++)
{
struct thread_ctx *thread_ctx = &ctx->work_threads[i];
__atomic_fetch_add(&thread_ctx->sf_metrics_need_send, 1, __ATOMIC_RELAXED);
}
}
sleep(MIN(ctx->metrics->config.statsd_cycle, sf_metrics_get_interval(ctx->work_threads[0].sf_metrics)));
clock_gettime(CLOCK_MONOTONIC, &current_time);
usleep(ts_update_interval * 1000);
timestamp_update(ctx->ts);
}
error_out:

View File

@@ -103,7 +103,7 @@ struct metadata
* API Declaration
******************************************************************************/
struct packet_io *packet_io_create(const char *profile, int thread_num);
struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_t *coremask);
void packet_io_destory(struct packet_io *handle);
int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx);
@@ -176,7 +176,7 @@ static int is_upstream_keepalive_packet(marsio_buff_t *rx_buff);
* API Definition
******************************************************************************/
struct packet_io *packet_io_create(const char *profile, int thread_num)
struct packet_io *packet_io_create(const char *profile, int thread_num, cpu_set_t *coremask)
{
int opt = 1;
struct packet_io *handle = (struct packet_io *)calloc(1, sizeof(struct packet_io));
@@ -195,6 +195,12 @@ struct packet_io *packet_io_create(const char *profile, int thread_num)
goto error_out;
}
if (marsio_option_set(handle->instance, MARSIO_OPT_THREAD_MASK_IN_CPUSET, coremask, sizeof(cpu_set_t)) != 0)
{
LOG_ERROR("%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_TAG_PKTIO);
goto error_out;
}
if (marsio_option_set(handle->instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0)
{
LOG_ERROR("%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_TAG_PKTIO);
@@ -289,6 +295,26 @@ void packet_io_destory(struct packet_io *handle)
}
}
int packet_io_thread_init(struct packet_io *handle, struct thread_ctx *thread_ctx)
{
if (marsio_thread_init(handle->instance) != 0)
{
LOG_ERROR("%s: unable to init marsio thread %d", LOG_TAG_PKTIO, thread_ctx->thread_index);
return -1;
}
return 0;
}
void packet_io_thread_wait(struct packet_io *handle, struct thread_ctx *thread_ctx, int timeout_ms)
{
struct mr_vdev *vdevs[] = {
handle->dev_nf_interface.mr_dev,
handle->dev_endpoint.mr_dev};
marsio_poll_wait(handle->instance, vdevs, 2, thread_ctx->thread_index, timeout_ms);
}
// return n_packet_recv
int packet_io_polling_nf_interface(struct packet_io *handle, int thread_seq, void *ctx)
{

View File

@@ -54,11 +54,20 @@ struct sce_ctx *sce_ctx_create(const char *profile)
MESA_load_profile_int_def(profile, "system", "enable_debug", (int *)&(ctx->enable_debug), 0);
MESA_load_profile_int_def(profile, "system", "firewall_sids", (int *)&(ctx->firewall_sids), 1001);
MESA_load_profile_int_def(profile, "system", "nr_worker_threads", (int *)&(ctx->nr_worker_threads), 8);
MESA_load_profile_int_def(profile, "system", "enable_cpu_affinity", (int *)&ctx->enable_cpu_affinity, 0);
MESA_load_profile_uint_range(profile, "system", "cpu_affinity_mask", MAX_THREAD_NUM, (unsigned int *)ctx->cpu_affinity_mask);
MESA_load_profile_int_def(profile, "system", "ts_update_interval_ms", (int *)&(ctx->ts_update_interval_ms), 1);
ctx->nr_worker_threads = MIN(ctx->nr_worker_threads, MAX_THREAD_NUM);
ctx->io = packet_io_create(profile, ctx->nr_worker_threads);
CPU_ZERO(&ctx->coremask);
for (int i = 0; i < ctx->nr_worker_threads; i++)
{
int cpu_id = ctx->cpu_affinity_mask[i];
CPU_SET(cpu_id, &ctx->coremask);
}
ctx->ts = timestamp_new(ctx->ts_update_interval_ms);
ctx->io = packet_io_create(profile, ctx->nr_worker_threads, &ctx->coremask);
if (ctx->io == NULL)
{
goto error_out;
@@ -95,6 +104,7 @@ void sce_ctx_destory(struct sce_ctx *ctx)
policy_enforcer_destory(ctx->enforcer);
global_metrics_destory(ctx->metrics);
packet_io_destory(ctx->io);
timestamp_free(ctx->ts);
free(ctx);
ctx = NULL;