refactor(session manager): turning the session manager into a stellar module

This commit is contained in:
luwenpeng
2024-09-20 16:56:05 +08:00
parent 620019cf8e
commit 94f1913e3e
20 changed files with 1944 additions and 1507 deletions

View File

@@ -185,6 +185,9 @@ uint16_t packet_get_raw_len(const struct packet *pkt);
const char *packet_get_payload(const struct packet *pkt); const char *packet_get_payload(const struct packet *pkt);
uint16_t packet_get_payload_len(const struct packet *pkt); uint16_t packet_get_payload_len(const struct packet *pkt);
void packet_set_exdata(struct packet *pkt, int idx, void *ex_ptr);
void *packet_get_exdata(struct packet *pkt, int idx);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@@ -5,7 +5,8 @@ extern "C"
{ {
#endif #endif
#include "packet.h" #include "stellar/exdata.h"
#include "stellar/packet.h"
enum packet_stage enum packet_stage
{ {
@@ -19,8 +20,10 @@ enum packet_stage
struct packet_manager; struct packet_manager;
int packet_manager_new_packet_exdata_index(struct packet_manager *pkt_mgr, const char *name, exdata_free *func, void *arg);
typedef void on_packet_stage_callback(enum packet_stage stage, struct packet *pkt, void *args); typedef void on_packet_stage_callback(enum packet_stage stage, struct packet *pkt, void *args);
int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback cb, void *args); int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback *cb, void *args);
// if two modules claim the same packet at the same stage, the second 'claim' fails. // if two modules claim the same packet at the same stage, the second 'claim' fails.
// return 0 on success // return 0 on success

View File

@@ -5,20 +5,8 @@ extern "C"
{ {
#endif #endif
#include <stdint.h>
#include "stellar/packet.h" #include "stellar/packet.h"
struct tcp_segment;
const char *tcp_segment_get_data(const struct tcp_segment *seg);
uint16_t tcp_segment_get_len(const struct tcp_segment *seg);
#define TOPIC_TCP_STREAM "TCP_STREAM" //topic message: tcp_segment
#define TOPIC_CONTROL_PACKET "CONTROL_PACKET" //topic message: packet
#define TOPIC_TCP "TCP" //topic message: session
#define TOPIC_UDP "UDP" //topic message: session
enum session_state enum session_state
{ {
SESSION_STATE_INIT = 0, SESSION_STATE_INIT = 0,
@@ -155,6 +143,9 @@ const char *session_get0_readable_addr(const struct session *sess);
void session_set_discard(struct session *sess); void session_set_discard(struct session *sess);
void session_set_exdata(struct session *sess, int idx, void *ex_ptr);
void *session_get_exdata(const struct session *sess, int idx);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@@ -0,0 +1,27 @@
#pragma once
#ifdef __cplusplus
extern "C"
{
#endif
#include "stellar/exdata.h"
#include "stellar/session.h"
struct session_manager;
int session_manager_new_packet_exdata_index(struct session_manager *sess_mgr, const char *name, exdata_free *func, void *arg);
typedef void on_session_callback(struct session *sess, struct packet *pkt, void *args);
typedef void on_tcp_stream_callback(struct session *sess, const char *tcp_payload, uint32_t tcp_payload_len, void *args);
int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_callback *cb, void *args);
int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_callback *cb, void *args);
int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_callback *cb, void *args);
int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_stream_callback *cb, void *args);
#ifdef __cplusplus
}
#endif

View File

@@ -11,6 +11,6 @@ target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/logger) target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/logger)
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/include) target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/include)
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra) target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra)
target_link_libraries(packet_manager tuple logger dablooms mq) target_link_libraries(packet_manager tuple logger dablooms mq exdata)
add_subdirectory(test) add_subdirectory(test)

View File

@@ -18,6 +18,7 @@ struct packet_manager_config
struct packet_manager_schema struct packet_manager_schema
{ {
struct exdata_schema *exdata;
struct mq_schema *mq; struct mq_schema *mq;
int topic_id[PACKET_STAGE_MAX]; int topic_id[PACKET_STAGE_MAX];
}; };
@@ -50,17 +51,17 @@ const char *packet_stage_to_str(enum packet_stage stage)
switch (stage) switch (stage)
{ {
case PACKET_STAGE_PREROUTING: case PACKET_STAGE_PREROUTING:
return "prerouting"; return "PACKET_STAGE_PREROUTING";
case PACKET_STAGE_INPUT: case PACKET_STAGE_INPUT:
return "input"; return "PACKET_STAGE_INPUT";
case PACKET_STAGE_FORWARD: case PACKET_STAGE_FORWARD:
return "forward"; return "PACKET_STAGE_FORWARD";
case PACKET_STAGE_OUTPUT: case PACKET_STAGE_OUTPUT:
return "output"; return "PACKET_STAGE_OUTPUT";
case PACKET_STAGE_POSTROUTING: case PACKET_STAGE_POSTROUTING:
return "postrouting"; return "PACKET_STAGE_POSTROUTING";
default: default:
return "unknown"; return "PACKET_STAGE_UNKNOWN";
} }
} }
@@ -102,7 +103,7 @@ static struct packet_manager_config *packet_manager_config_new(const char *toml_
* packet manager schema * packet manager schema
******************************************************************************/ ******************************************************************************/
static void on_packet_stage_dispatch(int topic_id, const void *msg, on_msg_cb_func *cb, void *cb_arg, void *dispatch_arg) static void on_packet_stage_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_arg, void *dispatch_arg)
{ {
assert(msg); assert(msg);
assert(dispatch_arg); assert(dispatch_arg);
@@ -120,7 +121,7 @@ static void on_packet_stage_dispatch(int topic_id, const void *msg, on_msg_cb_fu
} }
} }
((on_packet_stage_callback *)cb)(stage, pkt, cb_arg); ((on_packet_stage_callback *)(void *)cb)(stage, pkt, cb_arg);
} }
static void packet_manager_schema_free(struct packet_manager_schema *pkt_mgr_schema) static void packet_manager_schema_free(struct packet_manager_schema *pkt_mgr_schema)
@@ -138,6 +139,11 @@ static void packet_manager_schema_free(struct packet_manager_schema *pkt_mgr_sch
} }
} }
if (pkt_mgr_schema->exdata)
{
exdata_schema_free(pkt_mgr_schema->exdata);
}
free(pkt_mgr_schema); free(pkt_mgr_schema);
pkt_mgr_schema = NULL; pkt_mgr_schema = NULL;
} }
@@ -152,11 +158,17 @@ static struct packet_manager_schema *packet_manager_schema_new(struct mq_schema
return NULL; return NULL;
} }
pkt_mgr_schema->mq = mq; pkt_mgr_schema->exdata = exdata_schema_new();
if (pkt_mgr_schema->exdata == NULL)
{
PACKET_MANAGER_LOG_ERROR("failed to create exdata_schema");
goto error_out;
}
pkt_mgr_schema->mq = mq;
for (int i = 0; i < PACKET_STAGE_MAX; i++) for (int i = 0; i < PACKET_STAGE_MAX; i++)
{ {
pkt_mgr_schema->topic_id[i] = mq_schema_create_topic(pkt_mgr_schema->mq, packet_stage_to_str(i), (on_msg_dispatch_cb_func *)on_packet_stage_dispatch, pkt_mgr_schema, NULL, NULL); pkt_mgr_schema->topic_id[i] = mq_schema_create_topic(pkt_mgr_schema->mq, packet_stage_to_str(i), &on_packet_stage_dispatch, pkt_mgr_schema, NULL, NULL);
if (pkt_mgr_schema->topic_id[i] < 0) if (pkt_mgr_schema->topic_id[i] < 0)
{ {
PACKET_MANAGER_LOG_ERROR("failed to create topic %s", packet_stage_to_str(i)); PACKET_MANAGER_LOG_ERROR("failed to create topic %s", packet_stage_to_str(i));
@@ -276,7 +288,12 @@ void packet_manager_free(struct packet_manager *pkt_mgr)
} }
} }
int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback cb, void *args) int packet_manager_new_packet_exdata_index(struct packet_manager *pkt_mgr, const char *name, exdata_free *func, void *arg)
{
return exdata_schema_new_index(pkt_mgr->schema->exdata, name, func, arg);
}
int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback *cb, void *args)
{ {
return mq_schema_subscribe(pkt_mgr->schema->mq, pkt_mgr->schema->topic_id[stage], (on_msg_cb_func *)cb, args); return mq_schema_subscribe(pkt_mgr->schema->mq, pkt_mgr->schema->topic_id[stage], (on_msg_cb_func *)cb, args);
} }
@@ -291,6 +308,8 @@ void packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, str
void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt) void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt)
{ {
struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id]; struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id];
struct exdata_runtime *exdata_rt = exdata_runtime_new(pkt_mgr->schema->exdata);
packet_set_user_data(pkt, exdata_rt);
runtime->stat.total.pkts_ingress++; runtime->stat.total.pkts_ingress++;
runtime->stat.queue[PACKET_STAGE_PREROUTING].pkts_in++; runtime->stat.queue[PACKET_STAGE_PREROUTING].pkts_in++;
@@ -307,6 +326,9 @@ struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t th
runtime->stat.total.pkts_egress++; runtime->stat.total.pkts_egress++;
runtime->stat.queue[PACKET_STAGE_MAX].pkts_out++; runtime->stat.queue[PACKET_STAGE_MAX].pkts_out++;
TAILQ_REMOVE(&runtime->queue[PACKET_STAGE_MAX], pkt, stage_tqe); TAILQ_REMOVE(&runtime->queue[PACKET_STAGE_MAX], pkt, stage_tqe);
struct exdata_runtime *exdata_rt = packet_get_user_data(pkt);
exdata_runtime_free(exdata_rt);
} }
return pkt; return pkt;
} }

View File

@@ -5,6 +5,7 @@ extern "C"
{ {
#endif #endif
#include "stellar/mq.h"
#include "stellar/packet_manager.h" #include "stellar/packet_manager.h"
#define PACKET_QUEUE_MAX (PACKET_STAGE_MAX + 1) #define PACKET_QUEUE_MAX (PACKET_STAGE_MAX + 1)

View File

@@ -1,46 +0,0 @@
#pragma once
#ifdef __cplusplus
extern "C"
{
#endif
#include "stellar/packet_manager.h"
#define PACKET_QUEUE_MAX (PACKET_STAGE_MAX + 1)
struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, const char *toml_file);
void packet_manager_free(struct packet_manager *pkt_mgr);
void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, struct mq_runtime *mq_rt);
void packet_manager_runtime_ingress(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt);
struct packet *packet_manager_runtime_egress(struct packet_manager_runtime *pkt_mgr_rt);
void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt);
/******************************************************************************
* for gtest
******************************************************************************/
struct packet_manager_stat
{
struct
{
uint64_t pkts_ingress;
uint64_t pkts_egress;
} total;
struct
{
uint64_t pkts_in; // include the packets that are scheduled
uint64_t pkts_out; // include the packets that are claimed
uint64_t pkts_claim;
uint64_t pkts_schedule;
} queue[PACKET_QUEUE_MAX]; // the last queue is for sending packets
} __attribute__((aligned(64)));
const char *packet_stage_to_str(enum packet_stage stage);
void packet_manager_runtime_print_stat(struct packet_manager_runtime *runtime);
struct packet_manager_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *runtime);
#ifdef __cplusplus
}
#endif

View File

@@ -5,6 +5,7 @@
#include "log_internal.h" #include "log_internal.h"
#include "packet_helper.h" #include "packet_helper.h"
#include "packet_internal.h" #include "packet_internal.h"
#include "stellar/exdata.h"
#define PACKET_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet", format, ##__VA_ARGS__) #define PACKET_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet", format, ##__VA_ARGS__)
@@ -951,3 +952,15 @@ int packet_is_fragment(const struct packet *pkt)
{ {
return (pkt->frag_layer) ? 1 : 0; return (pkt->frag_layer) ? 1 : 0;
} }
void packet_set_exdata(struct packet *pkt, int idx, void *ex_ptr)
{
struct exdata_runtime *exdata_rt = (struct exdata_runtime *)packet_get_user_data(pkt);
exdata_set(exdata_rt, idx, ex_ptr);
}
void *packet_get_exdata(struct packet *pkt, int idx)
{
struct exdata_runtime *exdata_rt = (struct exdata_runtime *)packet_get_user_data(pkt);
return exdata_get(exdata_rt, idx);
}

View File

@@ -4,12 +4,13 @@ add_library(session_manager
session_table.c session_table.c
session_timer.c session_timer.c
session_filter.c session_filter.c
session_manager.c
session_transition.c session_transition.c
session_manager_runtime.c
session_manager.c
) )
target_include_directories(session_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_include_directories(session_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/) target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/)
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/include) target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(session_manager timeout packet_manager tcp_reassembly) target_link_libraries(session_manager timeout packet_manager tcp_reassembly mq exdata)
add_subdirectory(test) add_subdirectory(test)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,6 @@
#include "stellar/exdata.h"
#include "session_internal.h" #include "session_internal.h"
#include "session_manager.h" #include "session_manager_runtime.h"
void session_init(struct session *sess) void session_init(struct session *sess)
{ {
@@ -208,6 +209,7 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess)
{ {
sess->sess_mgr_stat->tcp_segs_consumed++; sess->sess_mgr_stat->tcp_segs_consumed++;
half->in_order_ref++; half->in_order_ref++;
half->in_order.user_data = sess;
return &half->in_order; return &half->in_order;
} }
else else
@@ -221,6 +223,7 @@ struct tcp_segment *session_get_tcp_segment(struct session *sess)
// TODO // TODO
sess->sess_mgr_stat->tcp_segs_consumed++; sess->sess_mgr_stat->tcp_segs_consumed++;
sess->sess_mgr_stat->tcp_segs_reordered++; sess->sess_mgr_stat->tcp_segs_reordered++;
seg->user_data = sess;
} }
return seg; return seg;
} }
@@ -461,3 +464,15 @@ void session_print(const struct session *sess)
session_to_str(sess, 0, buff, sizeof(buff)); session_to_str(sess, 0, buff, sizeof(buff));
printf("%s\n", buff); printf("%s\n", buff);
} }
void session_set_exdata(struct session *sess, int idx, void *ex_ptr)
{
struct exdata_runtime *rte = (struct exdata_runtime *)session_get_user_data(sess);
exdata_set(rte, idx, ex_ptr);
}
void *session_get_exdata(const struct session *sess, int idx)
{
struct exdata_runtime *rte = (struct exdata_runtime *)session_get_user_data(sess);
return exdata_get(rte, idx);
}

View File

@@ -7,7 +7,7 @@ extern "C"
{ {
#endif #endif
#include "session_manager.h" #include "session_manager_runtime.h"
static struct session_manager_config sess_mgr_cfg = { static struct session_manager_config sess_mgr_cfg = {
.session_id_seed = 0xFFFFF, .session_id_seed = 0xFFFFF,

View File

@@ -17,7 +17,7 @@
#include "stellar_stat.h" #include "stellar_stat.h"
#include "packet_internal.h" #include "packet_internal.h"
#include "session_internal.h" #include "session_internal.h"
#include "session_manager.h" #include "session_manager_runtime.h"
#define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__) #define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__)
#define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__) #define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__)

View File

@@ -7,7 +7,7 @@ extern "C"
#include "packet_io.h" #include "packet_io.h"
#include "ip_reassembly.h" #include "ip_reassembly.h"
#include "session_manager.h" #include "session_manager_runtime.h"
struct thread_stat struct thread_stat
{ {

View File

@@ -256,27 +256,3 @@ uint32_t tcp_reassembly_get_recv_next(struct tcp_reassembly *tcp_reass)
return tcp_reass->recv_next; return tcp_reass->recv_next;
} }
const char *tcp_segment_get_data(const struct tcp_segment *seg)
{
if (seg == NULL)
{
return NULL;
}
else
{
return (const char *)seg->data;
}
}
uint16_t tcp_segment_get_len(const struct tcp_segment *seg)
{
if (seg == NULL)
{
return 0;
}
else
{
return seg->len;
}
}

View File

@@ -12,6 +12,7 @@ struct tcp_segment
{ {
uint32_t len; uint32_t len;
const void *data; const void *data;
void *user_data;
}; };
struct tcp_segment *tcp_segment_new(uint32_t seq, const void *data, uint32_t len); struct tcp_segment *tcp_segment_new(uint32_t seq, const void *data, uint32_t len);

View File

@@ -18,9 +18,6 @@ global:
packet_build_udp; packet_build_udp;
packet_build_l3; packet_build_l3;
tcp_segment_get_data;
tcp_segment_get_len;
exdata_*; exdata_*;
mq_*; mq_*;
stellar_module_*; stellar_module_*;
@@ -57,6 +54,9 @@ global:
log_print; log_print;
log_check_level; log_check_level;
session_manager_module_on_init;
session_manager_module_on_exit;
http_message_*; http_message_*;
http_decoder_init; http_decoder_init;
http_decoder_exit; http_decoder_exit;