From d61da92e9203f90580e6abff573fd7af8ff11f9b Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Fri, 26 Jan 2024 14:41:40 +0800 Subject: [PATCH] add config for main config --- src/CMakeLists.txt | 1 + src/log/CMakeLists.txt | 7 ++ src/log/log.cpp | 3 + src/log/log.h | 23 +++++++ src/packet/CMakeLists.txt | 2 +- src/packet/packet.h | 5 +- src/session/CMakeLists.txt | 2 +- src/session/session_manager.h | 3 + src/stellar/stellar.cpp | 121 +++++++++++++++++++--------------- 9 files changed, 109 insertions(+), 58 deletions(-) create mode 100644 src/log/CMakeLists.txt create mode 100644 src/log/log.cpp create mode 100644 src/log/log.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 449f0c2..4f82bc2 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,3 +1,4 @@ +add_subdirectory(log) add_subdirectory(timestamp) add_subdirectory(tuple) add_subdirectory(packet) diff --git a/src/log/CMakeLists.txt b/src/log/CMakeLists.txt new file mode 100644 index 0000000..791be8f --- /dev/null +++ b/src/log/CMakeLists.txt @@ -0,0 +1,7 @@ +############################################################################### +# log +############################################################################### + +add_library(log log.cpp) +target_include_directories(log PUBLIC ${CMAKE_CURRENT_LIST_DIR}) +target_link_libraries(log) \ No newline at end of file diff --git a/src/log/log.cpp b/src/log/log.cpp new file mode 100644 index 0000000..54c7224 --- /dev/null +++ b/src/log/log.cpp @@ -0,0 +1,3 @@ +#include "log.h" + +// TODO \ No newline at end of file diff --git a/src/log/log.h b/src/log/log.h new file mode 100644 index 0000000..c96e86c --- /dev/null +++ b/src/log/log.h @@ -0,0 +1,23 @@ +#ifndef _LOG_H +#define _LOG_H + +#ifdef __cpluscplus +extern "C" +{ +#endif + +#include + +#define LOG_DEBUG(module, format, ...) \ + fprintf(stderr, "DEBUG (%s), " format "\n", module, ##__VA_ARGS__); + +#define LOG_ERROR(module, format, ...) \ + fprintf(stderr, "ERROR (%s), " format "\n", module, ##__VA_ARGS__); + + // TODO + +#ifdef __cpluscplus +} +#endif + +#endif diff --git a/src/packet/CMakeLists.txt b/src/packet/CMakeLists.txt index 0157926..890e14b 100644 --- a/src/packet/CMakeLists.txt +++ b/src/packet/CMakeLists.txt @@ -5,6 +5,6 @@ add_library(packet packet.cpp) target_include_directories(packet PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_include_directories(packet PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash) -target_link_libraries(packet tuple) +target_link_libraries(packet tuple log) add_subdirectory(test) \ No newline at end of file diff --git a/src/packet/packet.h b/src/packet/packet.h index aacbf87..187e4d8 100644 --- a/src/packet/packet.h +++ b/src/packet/packet.h @@ -9,14 +9,15 @@ extern "C" #include #include #include "tuple.h" +#include "log.h" #define PACKET_MAX_LAYERS 16 -// #define PACKET_LOG_ERROR(format, ...) void(0) +#define PACKET_LOG_ERROR(format, ...) LOG_ERROR("packet", format, ##__VA_ARGS__) #ifndef PACKET_LOG_ERROR #define PACKET_LOG_ERROR(format, ...) \ fprintf(stderr, "ERROR (packet), " format "\n", ##__VA_ARGS__); #endif -// #define PACKET_LOG_DEBUG(format, ...) void(0) +#define PACKET_LOG_DEBUG(format, ...) LOG_DEBUG("packet", format, ##__VA_ARGS__) #ifndef PACKET_LOG_DEBUG #define PACKET_LOG_DEBUG(format, ...) \ fprintf(stderr, "DEBUG (packet), " format "\n", ##__VA_ARGS__); diff --git a/src/session/CMakeLists.txt b/src/session/CMakeLists.txt index fb475eb..c33bf4e 100644 --- a/src/session/CMakeLists.txt +++ b/src/session/CMakeLists.txt @@ -11,6 +11,6 @@ add_library(session_manager session_manager.cpp ) target_include_directories(session_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) -target_link_libraries(session_manager timeout dupkt_filter eviction_filter) +target_link_libraries(session_manager timeout dupkt_filter eviction_filter log) add_subdirectory(test) \ No newline at end of file diff --git a/src/session/session_manager.h b/src/session/session_manager.h index 5e0fa30..7f2ea23 100644 --- a/src/session/session_manager.h +++ b/src/session/session_manager.h @@ -7,11 +7,14 @@ extern "C" #endif #include "session.h" +#include "log.h" +#define SESSION_LOG_ERROR(format, ...) LOG_ERROR("session", format, ##__VA_ARGS__) #ifndef SESSION_LOG_ERROR #define SESSION_LOG_ERROR(format, ...) \ fprintf(stderr, "ERROR (session), " format "\n", ##__VA_ARGS__); #endif +#define SESSION_LOG_DEBUG(format, ...) LOG_DEBUG("session", format, ##__VA_ARGS__) #ifndef SESSION_LOG_DEBUG #define SESSION_LOG_DEBUG(format, ...) \ fprintf(stderr, "DEBUG (session), " format "\n", ##__VA_ARGS__); diff --git a/src/stellar/stellar.cpp b/src/stellar/stellar.cpp index fec5b08..f6559d8 100644 --- a/src/stellar/stellar.cpp +++ b/src/stellar/stellar.cpp @@ -11,10 +11,12 @@ #include "timestamp.h" #include "session_manager.h" +#define STELLAR_LOG_ERROR(format, ...) LOG_ERROR("stellar", format, ##__VA_ARGS__) #ifndef STELLAR_LOG_ERROR #define STELLAR_LOG_ERROR(format, ...) \ fprintf(stderr, "ERROR (stellar), " format "\n", ##__VA_ARGS__); #endif +#define STELLAR_LOG_DEBUG(format, ...) LOG_DEBUG("stellar", format, ##__VA_ARGS__) #ifndef STELLAR_LOG_DEBUG #define STELLAR_LOG_DEBUG(format, ...) \ fprintf(stderr, "DEBUG (stellar), " format "\n", ##__VA_ARGS__); @@ -23,7 +25,7 @@ #define ATOMIC_SET(x, y) __atomic_store_n(x, y, __ATOMIC_RELAXED) #define ATOMIC_READ(x) __atomic_load_n(x, __ATOMIC_RELAXED) -struct thread_ctx +struct thread_context { pthread_t tid; uint16_t index; @@ -35,19 +37,23 @@ struct thread_ctx struct stellar_ctx { uint64_t need_exit; - uint16_t max_worker_num; + uint16_t thread_num; struct session_manager_config sess_mgr_cfg; - struct thread_ctx thread_ctx[128]; + struct thread_context thread_ctx[128]; } g_stellar_ctx = { .need_exit = 0, - .max_worker_num = 1, + .thread_num = 1, .sess_mgr_cfg = { // max session number .max_tcp_session_num = 3, .max_udp_session_num = 3, + // session overload + .tcp_overload_evict_old_sess = 1, // 1: evict old session, 0: bypass new session + .udp_overload_evict_old_sess = 1, // 1: evict old session, 0: bypass new session + // tcp timeout .tcp_timeout_init = 2, .tcp_timeout_handshake = 2, @@ -110,8 +116,25 @@ static void signal_handler(int signo) } } +static void __packet_dispatch(const struct packet *pkt) +{ + if (pkt == NULL) + { + return; + } + + printf("\n"); + printf("=> packet dispatch: %p\n", pkt); + printf("<= packet dispatch\n\n"); +} + static void __session_dispatch(struct session *sess) { + if (sess == NULL) + { + return; + } + printf("\n"); printf("=> session dispatch: %p\n", sess); session_dump(sess); @@ -121,17 +144,17 @@ static void __session_dispatch(struct session *sess) session_set_cur_dir(sess, SESSION_DIR_NONE); } -static int thread_ctx_init(struct stellar_ctx *ctx) +static int thread_context_init(struct stellar_ctx *ctx) { - for (uint16_t i = 0; i < ctx->max_worker_num; i++) + for (uint16_t i = 0; i < ctx->thread_num; i++) { - struct thread_ctx *thd_ctx = &ctx->thread_ctx[i]; - thd_ctx->index = i; - thd_ctx->need_exit = 0; - thd_ctx->is_runing = 0; + struct thread_context *thread_ctx = &ctx->thread_ctx[i]; + thread_ctx->index = i; + thread_ctx->need_exit = 0; + thread_ctx->is_runing = 0; - thd_ctx->sess_mgr = session_manager_create(&ctx->sess_mgr_cfg); - if (thd_ctx->sess_mgr == NULL) + thread_ctx->sess_mgr = session_manager_create(&ctx->sess_mgr_cfg); + if (thread_ctx->sess_mgr == NULL) { STELLAR_LOG_ERROR("unable to create session manager"); return -1; @@ -141,14 +164,14 @@ static int thread_ctx_init(struct stellar_ctx *ctx) return 0; } -static void thread_ctx_free(struct stellar_ctx *ctx) +static void thread_context_free(struct stellar_ctx *ctx) { - for (uint16_t i = 0; i < ctx->max_worker_num; i++) + for (uint16_t i = 0; i < ctx->thread_num; i++) { - struct thread_ctx *thd_ctx = &ctx->thread_ctx[i]; - if (ATOMIC_READ(&thd_ctx->is_runing) == 0) + struct thread_context *thread_ctx = &ctx->thread_ctx[i]; + if (ATOMIC_READ(&thread_ctx->is_runing) == 0) { - session_manager_destroy(thd_ctx->sess_mgr); + session_manager_destroy(thread_ctx->sess_mgr); } } } @@ -159,16 +182,16 @@ static void *thread_cycle(void *arg) const char *data = NULL; struct packet pkt; struct session *sess = NULL; - struct thread_ctx *thd_ctx = (struct thread_ctx *)arg; - struct session_manager *sess_mgr = thd_ctx->sess_mgr; - char thread_name[16]; + struct thread_context *thread_ctx = (struct thread_context *)arg; + struct session_manager *sess_mgr = thread_ctx->sess_mgr; + char thd_name[16]; - ATOMIC_SET(&thd_ctx->is_runing, 1); - snprintf(thread_name, sizeof(thread_name), "stellar:%d", thd_ctx->index); - prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); - STELLAR_LOG_DEBUG("worker thread %s runing\n", thread_name); + ATOMIC_SET(&thread_ctx->is_runing, 1); + snprintf(thd_name, sizeof(thd_name), "stellar:%d", thread_ctx->index); + prctl(PR_SET_NAME, (unsigned long long)thd_name, NULL, NULL, NULL); + STELLAR_LOG_DEBUG("worker thread %s runing\n", thd_name); - while (ATOMIC_READ(&thd_ctx->need_exit) == 0) + while (ATOMIC_READ(&thread_ctx->need_exit) == 0) { len = recv_packet(&data); if (data == NULL) @@ -177,35 +200,25 @@ static void *thread_cycle(void *arg) } packet_parse(&pkt, data, len); + __packet_dispatch(&pkt); + sess = session_manager_update_session(sess_mgr, &pkt); - if (sess == NULL) - { - goto fast_forward; - } __session_dispatch(sess); - sess = session_manager_get_evicted_session(sess_mgr); - if (sess) - { - __session_dispatch(sess); - } - - fast_forward: send_packet(data, len); - sess = session_manager_get_expired_session(sess_mgr); - if (sess) - { - __session_dispatch(sess); - } - continue; + sess = session_manager_get_evicted_session(sess_mgr); + __session_dispatch(sess); poll_wait: - sleep(session_manager_get_expire_interval(sess_mgr)); + sess = session_manager_get_expired_session(sess_mgr); + __session_dispatch(sess); + + usleep(1000); // session_manager_get_expire_interval(sess_mgr); (seconds) } - ATOMIC_SET(&thd_ctx->is_runing, 0); - STELLAR_LOG_DEBUG("worker thread %s exit\n", thread_name); + ATOMIC_SET(&thread_ctx->is_runing, 0); + STELLAR_LOG_DEBUG("worker thread %s exit\n", thd_name); return NULL; } @@ -224,16 +237,16 @@ int main(int argc, char **argv) timestamp_update(); - if (thread_ctx_init(&g_stellar_ctx) != 0) + if (thread_context_init(&g_stellar_ctx) != 0) { STELLAR_LOG_ERROR("unable to init thread context"); goto error_out; } - for (uint16_t i = 0; i < g_stellar_ctx.max_worker_num; i++) + for (uint16_t i = 0; i < g_stellar_ctx.thread_num; i++) { - struct thread_ctx *thd_ctx = &g_stellar_ctx.thread_ctx[i]; - if (pthread_create(&thd_ctx->tid, NULL, thread_cycle, (void *)thd_ctx) < 0) + struct thread_context *thread_ctx = &g_stellar_ctx.thread_ctx[i]; + if (pthread_create(&thread_ctx->tid, NULL, thread_cycle, (void *)thread_ctx) < 0) { STELLAR_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno)); goto error_out; @@ -246,18 +259,18 @@ int main(int argc, char **argv) sleep(1); } - for (uint16_t i = 0; i < g_stellar_ctx.max_worker_num; i++) + for (uint16_t i = 0; i < g_stellar_ctx.thread_num; i++) { - struct thread_ctx *thd_ctx = &g_stellar_ctx.thread_ctx[i]; - ATOMIC_SET(&thd_ctx->need_exit, 1); - while (ATOMIC_READ(&thd_ctx->is_runing) == 1) + struct thread_context *thread_ctx = &g_stellar_ctx.thread_ctx[i]; + ATOMIC_SET(&thread_ctx->need_exit, 1); + while (ATOMIC_READ(&thread_ctx->is_runing) == 1) { sleep(1); } } error_out: - thread_ctx_free(&g_stellar_ctx); + thread_context_free(&g_stellar_ctx); // TODO free plugin