load config from toml file

This commit is contained in:
luwenpeng
2024-01-29 14:15:33 +08:00
parent 07b1da819a
commit 7d7cc8e90c
10 changed files with 525 additions and 98 deletions

View File

@@ -7,6 +7,7 @@
#include <sys/prctl.h>
#include <signal.h>
#include "config.h"
#include "packet.h"
#include "timestamp.h"
#include "session_manager.h"
@@ -34,49 +35,15 @@ struct thread_context
struct session_manager *sess_mgr;
};
struct stellar_ctx
struct stellar_context
{
uint64_t need_exit;
uint16_t thread_num;
struct config config;
struct session_manager_config sess_mgr_cfg;
struct thread_context thread_ctx[128];
struct thread_context thread_ctx[MAX_THREAD_NUM];
} g_stellar_ctx = {
.need_exit = 0,
.thread_num = 1,
.sess_mgr_cfg = {
// max session number
.max_tcp_session_num = 3,
.max_udp_session_num = 3,
// session overload
.tcp_overload_evict_old_sess = 1, // 1: evict old session, 0: bypass new session
.udp_overload_evict_old_sess = 1, // 1: evict old session, 0: bypass new session
// tcp timeout
.tcp_timeout_init = 2,
.tcp_timeout_handshake = 2,
.tcp_timeout_data = 2,
.tcp_timeout_half_closed = 2,
.tcp_timeout_time_wait = 2,
.tcp_timeout_discard = 2,
// udp timeout
.udp_timeout_data = 1,
// tcp duplicate packet filter
.tcp_dupkt_filter_enable = 1,
.tcp_dupkt_filter_capacity = 1000,
.tcp_dupkt_filter_timeout = 10,
.tcp_dupkt_filter_error_rate = 0.0001,
// udp eviction filter
.udp_eviction_filter_enable = 1,
.udp_eviction_filter_capacity = 1000,
.udp_eviction_filter_timeout = 10,
.udp_eviction_filter_error_rate = 0.0001,
}};
};
static int recv_packet(const char **data)
{
@@ -116,7 +83,7 @@ static void signal_handler(int signo)
}
}
static void __packet_dispatch(const struct packet *pkt)
static void __packet_plugin_dispatch_example(const struct packet *pkt)
{
if (pkt == NULL)
{
@@ -128,7 +95,7 @@ static void __packet_dispatch(const struct packet *pkt)
printf("<= packet dispatch\n\n");
}
static void __session_dispatch(struct session *sess)
static void __session_plugin_dispatch_example(struct session *sess)
{
if (sess == NULL)
{
@@ -140,42 +107,11 @@ static void __session_dispatch(struct session *sess)
session_dump(sess);
printf("<= session dispatch\n\n");
// after session dispatch, we should reset session current packet and direction
session_set0_cur_pkt(sess, NULL);
session_set_cur_dir(sess, SESSION_DIR_NONE);
}
static int thread_context_init(struct stellar_ctx *ctx)
{
for (uint16_t i = 0; i < ctx->thread_num; i++)
{
struct thread_context *thread_ctx = &ctx->thread_ctx[i];
thread_ctx->index = i;
thread_ctx->need_exit = 0;
thread_ctx->is_runing = 0;
thread_ctx->sess_mgr = session_manager_create(&ctx->sess_mgr_cfg);
if (thread_ctx->sess_mgr == NULL)
{
STELLAR_LOG_ERROR("unable to create session manager");
return -1;
}
}
return 0;
}
static void thread_context_free(struct stellar_ctx *ctx)
{
for (uint16_t i = 0; i < ctx->thread_num; i++)
{
struct thread_context *thread_ctx = &ctx->thread_ctx[i];
if (ATOMIC_READ(&thread_ctx->is_runing) == 0)
{
session_manager_destroy(thread_ctx->sess_mgr);
}
}
}
static void *thread_cycle(void *arg)
{
uint16_t len = 0;
@@ -200,19 +136,19 @@ static void *thread_cycle(void *arg)
}
packet_parse(&pkt, data, len);
__packet_dispatch(&pkt);
__packet_plugin_dispatch_example(&pkt);
sess = session_manager_update_session(sess_mgr, &pkt);
__session_dispatch(sess);
__session_plugin_dispatch_example(sess);
send_packet(data, len);
sess = session_manager_get_evicted_session(sess_mgr);
__session_dispatch(sess);
__session_plugin_dispatch_example(sess);
poll_wait:
sess = session_manager_get_expired_session(sess_mgr);
__session_dispatch(sess);
__session_plugin_dispatch_example(sess);
usleep(1000); // session_manager_get_expire_interval(sess_mgr); (seconds)
}
@@ -223,9 +159,89 @@ static void *thread_cycle(void *arg)
return NULL;
}
static int thread_context_init(struct stellar_context *ctx)
{
struct system_config *sys_cfg = &ctx->config.sys_cfg;
struct session_manager_config *sess_mgr_cfg = &ctx->config.sess_mgr_cfg;
for (uint16_t i = 0; i < sys_cfg->nr_threads; i++)
{
struct thread_context *thread_ctx = &ctx->thread_ctx[i];
thread_ctx->index = i;
thread_ctx->need_exit = 0;
thread_ctx->is_runing = 0;
thread_ctx->sess_mgr = session_manager_create(sess_mgr_cfg);
if (thread_ctx->sess_mgr == NULL)
{
STELLAR_LOG_ERROR("unable to create session manager");
return -1;
}
}
return 0;
}
static void thread_context_free(struct stellar_context *ctx)
{
struct system_config *sys_cfg = &ctx->config.sys_cfg;
for (uint16_t i = 0; i < sys_cfg->nr_threads; i++)
{
struct thread_context *thread_ctx = &ctx->thread_ctx[i];
if (ATOMIC_READ(&thread_ctx->is_runing) == 0)
{
session_manager_destroy(thread_ctx->sess_mgr);
}
}
}
static int thread_create(struct stellar_context *ctx)
{
struct system_config *sys_cfg = &ctx->config.sys_cfg;
for (uint16_t i = 0; i < sys_cfg->nr_threads; i++)
{
struct thread_context *thread_ctx = &ctx->thread_ctx[i];
if (pthread_create(&thread_ctx->tid, NULL, thread_cycle, (void *)thread_ctx) < 0)
{
STELLAR_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno));
return -1;
}
}
return 0;
}
static void thread_destroy(struct stellar_context *ctx)
{
struct system_config *sys_cfg = &ctx->config.sys_cfg;
for (uint16_t i = 0; i < sys_cfg->nr_threads; i++)
{
struct thread_context *thread_ctx = &ctx->thread_ctx[i];
ATOMIC_SET(&thread_ctx->need_exit, 1);
while (ATOMIC_READ(&thread_ctx->is_runing) == 1)
{
sleep(1);
}
}
}
int main(int argc, char **argv)
{
// TODO parse command line
if (argc != 2)
{
printf("usage: %s <config file>\n", argv[0]);
return 0;
}
if (config_load(&g_stellar_ctx.config, argv[1]) != 0)
{
return -1;
}
config_dump(&g_stellar_ctx.config);
// TODO init log
@@ -243,14 +259,10 @@ int main(int argc, char **argv)
goto error_out;
}
for (uint16_t i = 0; i < g_stellar_ctx.thread_num; i++)
if (thread_create(&g_stellar_ctx) != 0)
{
struct thread_context *thread_ctx = &g_stellar_ctx.thread_ctx[i];
if (pthread_create(&thread_ctx->tid, NULL, thread_cycle, (void *)thread_ctx) < 0)
{
STELLAR_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno));
goto error_out;
}
STELLAR_LOG_ERROR("unable to create worker thread");
goto error_out;
}
while (!g_stellar_ctx.need_exit)
@@ -259,17 +271,9 @@ int main(int argc, char **argv)
sleep(1);
}
for (uint16_t i = 0; i < g_stellar_ctx.thread_num; i++)
{
struct thread_context *thread_ctx = &g_stellar_ctx.thread_ctx[i];
ATOMIC_SET(&thread_ctx->need_exit, 1);
while (ATOMIC_READ(&thread_ctx->is_runing) == 1)
{
sleep(1);
}
}
error_out:
thread_destroy(&g_stellar_ctx);
thread_context_free(&g_stellar_ctx);
// TODO free plugin