add config for main config

This commit is contained in:
luwenpeng
2024-01-26 14:41:40 +08:00
parent 3575cf9367
commit d61da92e92
9 changed files with 109 additions and 58 deletions

View File

@@ -1,3 +1,4 @@
add_subdirectory(log)
add_subdirectory(timestamp) add_subdirectory(timestamp)
add_subdirectory(tuple) add_subdirectory(tuple)
add_subdirectory(packet) add_subdirectory(packet)

7
src/log/CMakeLists.txt Normal file
View File

@@ -0,0 +1,7 @@
###############################################################################
# log
###############################################################################
add_library(log log.cpp)
target_include_directories(log PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_link_libraries(log)

3
src/log/log.cpp Normal file
View File

@@ -0,0 +1,3 @@
#include "log.h"
// TODO

23
src/log/log.h Normal file
View File

@@ -0,0 +1,23 @@
#ifndef _LOG_H
#define _LOG_H
#ifdef __cpluscplus
extern "C"
{
#endif
#include <stdio.h>
#define LOG_DEBUG(module, format, ...) \
fprintf(stderr, "DEBUG (%s), " format "\n", module, ##__VA_ARGS__);
#define LOG_ERROR(module, format, ...) \
fprintf(stderr, "ERROR (%s), " format "\n", module, ##__VA_ARGS__);
// TODO
#ifdef __cpluscplus
}
#endif
#endif

View File

@@ -5,6 +5,6 @@
add_library(packet packet.cpp) add_library(packet packet.cpp)
target_include_directories(packet PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_include_directories(packet PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_include_directories(packet PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash) target_include_directories(packet PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash)
target_link_libraries(packet tuple) target_link_libraries(packet tuple log)
add_subdirectory(test) add_subdirectory(test)

View File

@@ -9,14 +9,15 @@ extern "C"
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include "tuple.h" #include "tuple.h"
#include "log.h"
#define PACKET_MAX_LAYERS 16 #define PACKET_MAX_LAYERS 16
// #define PACKET_LOG_ERROR(format, ...) void(0) #define PACKET_LOG_ERROR(format, ...) LOG_ERROR("packet", format, ##__VA_ARGS__)
#ifndef PACKET_LOG_ERROR #ifndef PACKET_LOG_ERROR
#define PACKET_LOG_ERROR(format, ...) \ #define PACKET_LOG_ERROR(format, ...) \
fprintf(stderr, "ERROR (packet), " format "\n", ##__VA_ARGS__); fprintf(stderr, "ERROR (packet), " format "\n", ##__VA_ARGS__);
#endif #endif
// #define PACKET_LOG_DEBUG(format, ...) void(0) #define PACKET_LOG_DEBUG(format, ...) LOG_DEBUG("packet", format, ##__VA_ARGS__)
#ifndef PACKET_LOG_DEBUG #ifndef PACKET_LOG_DEBUG
#define PACKET_LOG_DEBUG(format, ...) \ #define PACKET_LOG_DEBUG(format, ...) \
fprintf(stderr, "DEBUG (packet), " format "\n", ##__VA_ARGS__); fprintf(stderr, "DEBUG (packet), " format "\n", ##__VA_ARGS__);

View File

@@ -11,6 +11,6 @@ add_library(session_manager
session_manager.cpp session_manager.cpp
) )
target_include_directories(session_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_include_directories(session_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_link_libraries(session_manager timeout dupkt_filter eviction_filter) target_link_libraries(session_manager timeout dupkt_filter eviction_filter log)
add_subdirectory(test) add_subdirectory(test)

View File

@@ -7,11 +7,14 @@ extern "C"
#endif #endif
#include "session.h" #include "session.h"
#include "log.h"
#define SESSION_LOG_ERROR(format, ...) LOG_ERROR("session", format, ##__VA_ARGS__)
#ifndef SESSION_LOG_ERROR #ifndef SESSION_LOG_ERROR
#define SESSION_LOG_ERROR(format, ...) \ #define SESSION_LOG_ERROR(format, ...) \
fprintf(stderr, "ERROR (session), " format "\n", ##__VA_ARGS__); fprintf(stderr, "ERROR (session), " format "\n", ##__VA_ARGS__);
#endif #endif
#define SESSION_LOG_DEBUG(format, ...) LOG_DEBUG("session", format, ##__VA_ARGS__)
#ifndef SESSION_LOG_DEBUG #ifndef SESSION_LOG_DEBUG
#define SESSION_LOG_DEBUG(format, ...) \ #define SESSION_LOG_DEBUG(format, ...) \
fprintf(stderr, "DEBUG (session), " format "\n", ##__VA_ARGS__); fprintf(stderr, "DEBUG (session), " format "\n", ##__VA_ARGS__);

View File

@@ -11,10 +11,12 @@
#include "timestamp.h" #include "timestamp.h"
#include "session_manager.h" #include "session_manager.h"
#define STELLAR_LOG_ERROR(format, ...) LOG_ERROR("stellar", format, ##__VA_ARGS__)
#ifndef STELLAR_LOG_ERROR #ifndef STELLAR_LOG_ERROR
#define STELLAR_LOG_ERROR(format, ...) \ #define STELLAR_LOG_ERROR(format, ...) \
fprintf(stderr, "ERROR (stellar), " format "\n", ##__VA_ARGS__); fprintf(stderr, "ERROR (stellar), " format "\n", ##__VA_ARGS__);
#endif #endif
#define STELLAR_LOG_DEBUG(format, ...) LOG_DEBUG("stellar", format, ##__VA_ARGS__)
#ifndef STELLAR_LOG_DEBUG #ifndef STELLAR_LOG_DEBUG
#define STELLAR_LOG_DEBUG(format, ...) \ #define STELLAR_LOG_DEBUG(format, ...) \
fprintf(stderr, "DEBUG (stellar), " format "\n", ##__VA_ARGS__); fprintf(stderr, "DEBUG (stellar), " format "\n", ##__VA_ARGS__);
@@ -23,7 +25,7 @@
#define ATOMIC_SET(x, y) __atomic_store_n(x, y, __ATOMIC_RELAXED) #define ATOMIC_SET(x, y) __atomic_store_n(x, y, __ATOMIC_RELAXED)
#define ATOMIC_READ(x) __atomic_load_n(x, __ATOMIC_RELAXED) #define ATOMIC_READ(x) __atomic_load_n(x, __ATOMIC_RELAXED)
struct thread_ctx struct thread_context
{ {
pthread_t tid; pthread_t tid;
uint16_t index; uint16_t index;
@@ -35,19 +37,23 @@ struct thread_ctx
struct stellar_ctx struct stellar_ctx
{ {
uint64_t need_exit; uint64_t need_exit;
uint16_t max_worker_num; uint16_t thread_num;
struct session_manager_config sess_mgr_cfg; struct session_manager_config sess_mgr_cfg;
struct thread_ctx thread_ctx[128]; struct thread_context thread_ctx[128];
} g_stellar_ctx = { } g_stellar_ctx = {
.need_exit = 0, .need_exit = 0,
.max_worker_num = 1, .thread_num = 1,
.sess_mgr_cfg = { .sess_mgr_cfg = {
// max session number // max session number
.max_tcp_session_num = 3, .max_tcp_session_num = 3,
.max_udp_session_num = 3, .max_udp_session_num = 3,
// session overload
.tcp_overload_evict_old_sess = 1, // 1: evict old session, 0: bypass new session
.udp_overload_evict_old_sess = 1, // 1: evict old session, 0: bypass new session
// tcp timeout // tcp timeout
.tcp_timeout_init = 2, .tcp_timeout_init = 2,
.tcp_timeout_handshake = 2, .tcp_timeout_handshake = 2,
@@ -110,8 +116,25 @@ static void signal_handler(int signo)
} }
} }
static void __packet_dispatch(const struct packet *pkt)
{
if (pkt == NULL)
{
return;
}
printf("\n");
printf("=> packet dispatch: %p\n", pkt);
printf("<= packet dispatch\n\n");
}
static void __session_dispatch(struct session *sess) static void __session_dispatch(struct session *sess)
{ {
if (sess == NULL)
{
return;
}
printf("\n"); printf("\n");
printf("=> session dispatch: %p\n", sess); printf("=> session dispatch: %p\n", sess);
session_dump(sess); session_dump(sess);
@@ -121,17 +144,17 @@ static void __session_dispatch(struct session *sess)
session_set_cur_dir(sess, SESSION_DIR_NONE); session_set_cur_dir(sess, SESSION_DIR_NONE);
} }
static int thread_ctx_init(struct stellar_ctx *ctx) static int thread_context_init(struct stellar_ctx *ctx)
{ {
for (uint16_t i = 0; i < ctx->max_worker_num; i++) for (uint16_t i = 0; i < ctx->thread_num; i++)
{ {
struct thread_ctx *thd_ctx = &ctx->thread_ctx[i]; struct thread_context *thread_ctx = &ctx->thread_ctx[i];
thd_ctx->index = i; thread_ctx->index = i;
thd_ctx->need_exit = 0; thread_ctx->need_exit = 0;
thd_ctx->is_runing = 0; thread_ctx->is_runing = 0;
thd_ctx->sess_mgr = session_manager_create(&ctx->sess_mgr_cfg); thread_ctx->sess_mgr = session_manager_create(&ctx->sess_mgr_cfg);
if (thd_ctx->sess_mgr == NULL) if (thread_ctx->sess_mgr == NULL)
{ {
STELLAR_LOG_ERROR("unable to create session manager"); STELLAR_LOG_ERROR("unable to create session manager");
return -1; return -1;
@@ -141,14 +164,14 @@ static int thread_ctx_init(struct stellar_ctx *ctx)
return 0; return 0;
} }
static void thread_ctx_free(struct stellar_ctx *ctx) static void thread_context_free(struct stellar_ctx *ctx)
{ {
for (uint16_t i = 0; i < ctx->max_worker_num; i++) for (uint16_t i = 0; i < ctx->thread_num; i++)
{ {
struct thread_ctx *thd_ctx = &ctx->thread_ctx[i]; struct thread_context *thread_ctx = &ctx->thread_ctx[i];
if (ATOMIC_READ(&thd_ctx->is_runing) == 0) if (ATOMIC_READ(&thread_ctx->is_runing) == 0)
{ {
session_manager_destroy(thd_ctx->sess_mgr); session_manager_destroy(thread_ctx->sess_mgr);
} }
} }
} }
@@ -159,16 +182,16 @@ static void *thread_cycle(void *arg)
const char *data = NULL; const char *data = NULL;
struct packet pkt; struct packet pkt;
struct session *sess = NULL; struct session *sess = NULL;
struct thread_ctx *thd_ctx = (struct thread_ctx *)arg; struct thread_context *thread_ctx = (struct thread_context *)arg;
struct session_manager *sess_mgr = thd_ctx->sess_mgr; struct session_manager *sess_mgr = thread_ctx->sess_mgr;
char thread_name[16]; char thd_name[16];
ATOMIC_SET(&thd_ctx->is_runing, 1); ATOMIC_SET(&thread_ctx->is_runing, 1);
snprintf(thread_name, sizeof(thread_name), "stellar:%d", thd_ctx->index); snprintf(thd_name, sizeof(thd_name), "stellar:%d", thread_ctx->index);
prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); prctl(PR_SET_NAME, (unsigned long long)thd_name, NULL, NULL, NULL);
STELLAR_LOG_DEBUG("worker thread %s runing\n", thread_name); STELLAR_LOG_DEBUG("worker thread %s runing\n", thd_name);
while (ATOMIC_READ(&thd_ctx->need_exit) == 0) while (ATOMIC_READ(&thread_ctx->need_exit) == 0)
{ {
len = recv_packet(&data); len = recv_packet(&data);
if (data == NULL) if (data == NULL)
@@ -177,35 +200,25 @@ static void *thread_cycle(void *arg)
} }
packet_parse(&pkt, data, len); packet_parse(&pkt, data, len);
__packet_dispatch(&pkt);
sess = session_manager_update_session(sess_mgr, &pkt); sess = session_manager_update_session(sess_mgr, &pkt);
if (sess == NULL)
{
goto fast_forward;
}
__session_dispatch(sess); __session_dispatch(sess);
sess = session_manager_get_evicted_session(sess_mgr);
if (sess)
{
__session_dispatch(sess);
}
fast_forward:
send_packet(data, len); send_packet(data, len);
sess = session_manager_get_expired_session(sess_mgr); sess = session_manager_get_evicted_session(sess_mgr);
if (sess) __session_dispatch(sess);
{
__session_dispatch(sess);
}
continue;
poll_wait: poll_wait:
sleep(session_manager_get_expire_interval(sess_mgr)); sess = session_manager_get_expired_session(sess_mgr);
__session_dispatch(sess);
usleep(1000); // session_manager_get_expire_interval(sess_mgr); (seconds)
} }
ATOMIC_SET(&thd_ctx->is_runing, 0); ATOMIC_SET(&thread_ctx->is_runing, 0);
STELLAR_LOG_DEBUG("worker thread %s exit\n", thread_name); STELLAR_LOG_DEBUG("worker thread %s exit\n", thd_name);
return NULL; return NULL;
} }
@@ -224,16 +237,16 @@ int main(int argc, char **argv)
timestamp_update(); timestamp_update();
if (thread_ctx_init(&g_stellar_ctx) != 0) if (thread_context_init(&g_stellar_ctx) != 0)
{ {
STELLAR_LOG_ERROR("unable to init thread context"); STELLAR_LOG_ERROR("unable to init thread context");
goto error_out; goto error_out;
} }
for (uint16_t i = 0; i < g_stellar_ctx.max_worker_num; i++) for (uint16_t i = 0; i < g_stellar_ctx.thread_num; i++)
{ {
struct thread_ctx *thd_ctx = &g_stellar_ctx.thread_ctx[i]; struct thread_context *thread_ctx = &g_stellar_ctx.thread_ctx[i];
if (pthread_create(&thd_ctx->tid, NULL, thread_cycle, (void *)thd_ctx) < 0) if (pthread_create(&thread_ctx->tid, NULL, thread_cycle, (void *)thread_ctx) < 0)
{ {
STELLAR_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno)); STELLAR_LOG_ERROR("unable to create worker thread, error %d: %s", errno, strerror(errno));
goto error_out; goto error_out;
@@ -246,18 +259,18 @@ int main(int argc, char **argv)
sleep(1); sleep(1);
} }
for (uint16_t i = 0; i < g_stellar_ctx.max_worker_num; i++) for (uint16_t i = 0; i < g_stellar_ctx.thread_num; i++)
{ {
struct thread_ctx *thd_ctx = &g_stellar_ctx.thread_ctx[i]; struct thread_context *thread_ctx = &g_stellar_ctx.thread_ctx[i];
ATOMIC_SET(&thd_ctx->need_exit, 1); ATOMIC_SET(&thread_ctx->need_exit, 1);
while (ATOMIC_READ(&thd_ctx->is_runing) == 1) while (ATOMIC_READ(&thread_ctx->is_runing) == 1)
{ {
sleep(1); sleep(1);
} }
} }
error_out: error_out:
thread_ctx_free(&g_stellar_ctx); thread_context_free(&g_stellar_ctx);
// TODO free plugin // TODO free plugin