session manager not trigger event

This commit is contained in:
luwenpeng
2024-01-15 11:21:11 +08:00
parent 84bdd92534
commit a045c04f8d
8 changed files with 95 additions and 265 deletions

View File

@@ -25,52 +25,6 @@ uint8_t s2c_1st_md_ex = 0; // built-in ex_data index
uint8_t c2s_1st_pkt_ex = 0; // built-in ex_data index
uint8_t s2c_1st_pkt_ex = 0; // built-in ex_data index
/******************************************************************************
* ev queue
******************************************************************************/
static void event_queue_init(struct event_queue *queue)
{
queue->head_idx = 0;
queue->tail_idx = 0;
}
static bool event_queue_is_empty(struct event_queue *queue)
{
return queue->head_idx == queue->tail_idx;
}
static bool event_queue_is_full(struct event_queue *queue)
{
return (queue->tail_idx + 1) % SESSION_EVENT_QUEUE_SIZE == queue->head_idx;
}
static bool event_queue_push(struct event_queue *queue, uint32_t event)
{
if (event_queue_is_full(queue))
{
return false;
}
queue->events[queue->tail_idx] = event;
queue->tail_idx = (queue->tail_idx + 1) % SESSION_EVENT_QUEUE_SIZE;
return true;
}
static bool event_queue_pop(struct event_queue *queue, uint32_t *event)
{
if (event_queue_is_empty(queue))
{
return false;
}
*event = queue->events[queue->head_idx];
queue->head_idx = (queue->head_idx + 1) % SESSION_EVENT_QUEUE_SIZE;
return true;
}
/******************************************************************************
* session
******************************************************************************/
@@ -78,7 +32,6 @@ static bool event_queue_pop(struct event_queue *queue, uint32_t *event)
void session_init(struct session *sess)
{
memset(sess, 0, sizeof(struct session));
event_queue_init(&sess->events);
}
// session id
@@ -238,21 +191,6 @@ enum session_dir session_get_cur_dir(const struct session *sess)
return sess->cur_dir;
}
/******************************************************************************
* session event
******************************************************************************/
// session event
bool session_push_event(struct session *sess, uint32_t event)
{
return event_queue_push(&sess->events, event);
}
bool session_pop_event(struct session *sess, uint32_t *event)
{
return event_queue_pop(&sess->events, event);
}
/******************************************************************************
* session ex data
******************************************************************************/
@@ -450,23 +388,6 @@ static void udp_ex_data_tostring(uint64_t ex_data, char *buffer, size_t buffer_l
}
}
const char *session_event_tostring(enum session_event event)
{
switch (event)
{
case SESSION_EVENT_NONE:
return "none";
case SESSION_EVENT_OPENING:
return "opening";
case SESSION_EVENT_PACKET:
return "packet";
case SESSION_EVENT_CLOSING:
return "closing";
default:
return "unknown";
}
}
const char *session_state_tostring(enum session_state state)
{
switch (state)

View File

@@ -32,18 +32,6 @@ enum session_type
SESSION_TYPE_UDP = 0x2,
};
enum session_event
{
SESSION_EVENT_NONE = 0,
SESSION_EVENT_OPENING,
SESSION_EVENT_PACKET,
SESSION_EVENT_CLOSING,
// Add new event before SESSION_EVENT_MAX
SESSION_EVENT_MAX,
};
enum session_dir
{
SESSION_DIR_NONE = 0,
@@ -107,14 +95,6 @@ const struct packet *session_get0_cur_pkt(const struct session *sess);
void session_set_cur_dir(struct session *sess, enum session_dir dir);
enum session_dir session_get_cur_dir(const struct session *sess);
/******************************************************************************
* session event
******************************************************************************/
// session event
bool session_push_event(struct session *sess, uint32_t event);
bool session_pop_event(struct session *sess, uint32_t *event);
/******************************************************************************
* session ex data
******************************************************************************/
@@ -160,7 +140,6 @@ void session_run_expirecb(struct session *sess);
* session dump
******************************************************************************/
const char *session_event_tostring(enum session_event event);
const char *session_state_tostring(enum session_state state);
const char *session_type_tostring(enum session_type type);
const char *session_dir_tostring(enum session_dir dir);

View File

@@ -19,9 +19,6 @@ struct session_manager
struct session_timer *sess_timer;
struct session_queue *evicted_sess;
session_event_cb event_cb;
void *arg;
// timeout config
uint64_t timeout_toclosing;
uint64_t timeout_toclosed;
@@ -205,11 +202,6 @@ static void session_to_closed(struct session *sess, void *arg)
struct session_manager *mgr = (struct session_manager *)arg;
assert(mgr != NULL);
uint32_t event;
while (session_pop_event(sess, &event))
{
}
update_counter_on_closed(mgr, sess);
session_set_state(sess, SESSION_STATE_CLOSED);
session_set0_cur_pkt(sess, NULL);
@@ -231,7 +223,6 @@ static void session_to_closing(struct session *sess, void *arg)
update_counter_on_closing(mgr, sess);
session_set_state(sess, SESSION_STATE_CLOSING);
session_push_event(sess, SESSION_EVENT_CLOSING);
session_manager_update_session_timer(mgr, sess, session_to_closed, mgr->timeout_toclosed);
}
@@ -426,8 +417,6 @@ static int handle_tcp_new_session(struct session_manager *mgr, struct tuple6 *ke
session_set_create_time(sess, timestamp_get_msec());
update_session_base(sess, pkt, curr_dir);
session_push_event(sess, SESSION_EVENT_OPENING);
session_push_event(sess, SESSION_EVENT_PACKET);
session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_toclosing);
return 0;
@@ -458,8 +447,6 @@ static int handle_udp_new_session(struct session_manager *mgr, struct tuple6 *ke
session_set_create_time(sess, timestamp_get_msec());
update_session_base(sess, pkt, curr_dir);
session_push_event(sess, SESSION_EVENT_OPENING);
session_push_event(sess, SESSION_EVENT_PACKET);
session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_toclosing);
return 0;
@@ -476,8 +463,6 @@ static void handle_tcp_old_session(struct session_manager *mgr, struct tuple6 *k
{
update_counter_on_closing(mgr, sess);
session_set_state(sess, SESSION_STATE_CLOSING);
session_push_event(sess, SESSION_EVENT_PACKET);
session_push_event(sess, SESSION_EVENT_CLOSING);
session_manager_update_session_timer(mgr, sess, session_to_closed, mgr->timeout_toclosed);
return;
}
@@ -486,7 +471,6 @@ static void handle_tcp_old_session(struct session_manager *mgr, struct tuple6 *k
{
update_counter_on_active(mgr, sess);
session_set_state(sess, SESSION_STATE_ACTIVE);
session_push_event(sess, SESSION_EVENT_PACKET);
session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_toclosing);
return;
}
@@ -500,7 +484,6 @@ static void handle_udp_old_session(struct session_manager *mgr, struct tuple6 *k
update_counter_on_active(mgr, sess);
session_set_state(sess, SESSION_STATE_ACTIVE);
session_push_event(sess, SESSION_EVENT_PACKET);
session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_toclosing);
}
@@ -609,12 +592,6 @@ void session_manager_destroy(struct session_manager *mgr)
}
}
void session_manager_set_session_eventcb(struct session_manager *mgr, session_event_cb cb, void *arg)
{
mgr->event_cb = cb;
mgr->arg = arg;
}
void session_manager_set_timeout_toclosing(struct session_manager *mgr, uint64_t timeout_ms)
{
mgr->timeout_toclosing = timeout_ms;
@@ -655,7 +632,6 @@ struct session *session_manager_update(struct session_manager *mgr, const struct
update_counter_on_closing(mgr, unused_sess);
session_set_state(unused_sess, SESSION_STATE_CLOSING);
session_push_event(unused_sess, SESSION_EVENT_CLOSING);
session_queue_push(mgr->evicted_sess, unused_sess);
session_manager_update_session_timer(mgr, unused_sess, session_to_closed, mgr->timeout_toclosed);
}
@@ -702,34 +678,9 @@ struct session *session_manager_expire(struct session_manager *mgr)
return sess;
}
static void session_dispatch(struct session_manager *mgr, struct session *sess)
struct session *session_manager_evicte(struct session_manager *mgr)
{
if (sess == NULL)
{
return;
}
uint32_t event;
while (session_pop_event(sess, &event))
{
SESSION_LOG_DEBUG("handle \"%s\" event on session %lu", session_event_tostring((enum session_event)event), session_get_id(sess));
if (mgr->event_cb)
{
mgr->event_cb(sess, event, mgr->arg);
}
}
session_set0_cur_pkt(sess, NULL);
session_set_cur_dir(sess, SESSION_DIR_NONE);
}
void session_manager_dispatch(struct session_manager *mgr, struct session *sess)
{
// Handle sessions that are ready to be processed
session_dispatch(mgr, sess);
// Handle sessions that are evicted because the flow table is full
struct session *evicted_sess = session_queue_pop(mgr->evicted_sess);
session_dispatch(mgr, evicted_sess);
return session_queue_pop(mgr->evicted_sess);
}
uint64_t session_manager_get_sessions(struct session_manager *mgr, enum session_type type, enum session_state state)

View File

@@ -23,8 +23,6 @@ struct session_manager;
struct session_manager *session_manager_create(uint64_t max_session_num);
void session_manager_destroy(struct session_manager *mgr);
typedef void (*session_event_cb)(struct session *sess, uint32_t event, void *arg);
void session_manager_set_session_eventcb(struct session_manager *mgr, session_event_cb cb, void *arg);
void session_manager_set_timeout_toclosing(struct session_manager *mgr, uint64_t timeout_ms);
void session_manager_set_timeout_toclosed(struct session_manager *mgr, uint64_t timeout_ms);
@@ -32,7 +30,7 @@ struct session *session_manager_lookup(struct session_manager *mgr, const struct
// return null: invalid tuple6 or tcp first packet is not syn
struct session *session_manager_update(struct session_manager *mgr, const struct packet *pkt);
struct session *session_manager_expire(struct session_manager *mgr);
void session_manager_dispatch(struct session_manager *mgr, struct session *sess);
struct session *session_manager_evicte(struct session_manager *mgr);
// for debug
uint64_t session_manager_get_sessions(struct session_manager *mgr, enum session_type type, enum session_state state);

View File

@@ -15,7 +15,6 @@ extern "C"
#include "session.h"
#define EX_DATA_MAX_COUNT 128
#define SESSION_EVENT_QUEUE_SIZE 256
enum tcp_ex_data
{
@@ -39,13 +38,6 @@ enum udp_ex_data
UDP_S2C_RECVED = 1 << 1,
};
struct event_queue
{
uint32_t head_idx;
uint32_t tail_idx;
uint32_t events[SESSION_EVENT_QUEUE_SIZE];
};
struct session
{
// session id
@@ -75,12 +67,6 @@ struct session
const struct packet *cur_pkt;
enum session_dir cur_dir;
/******************************
* Session Ev Queue Zone
******************************/
struct event_queue events;
/******************************
* Session Ex Data Zone
******************************/

View File

@@ -83,34 +83,6 @@ TEST(SESSION, EX_FREE_BY_CB)
session_free_ex_data(&sess, idx);
}
TEST(SESSION, EV_QUEUE)
{
uint32_t event = 0;
struct session sess;
session_init(&sess);
EXPECT_TRUE(session_pop_event(&sess, &event) == false);
EXPECT_TRUE(session_push_event(&sess, 0x1234) == true);
EXPECT_TRUE(session_pop_event(&sess, &event) == true);
EXPECT_TRUE(event == 0x1234);
EXPECT_TRUE(session_pop_event(&sess, &event) == false);
for (int j = 0; j < 10; j++)
{
for (uint32_t i = 0; i < SESSION_EVENT_QUEUE_SIZE - 1; i++)
{
EXPECT_TRUE(session_push_event(&sess, i) == true);
}
EXPECT_TRUE(session_push_event(&sess, 0) == false);
for (uint32_t i = 0; i < SESSION_EVENT_QUEUE_SIZE - 1; i++)
{
EXPECT_TRUE(session_pop_event(&sess, &event) == true);
EXPECT_TRUE(event == i);
}
EXPECT_TRUE(session_pop_event(&sess, &event) == false);
}
}
int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);

View File

@@ -12,34 +12,41 @@
uint8_t plugin_ex = 0;
const char *plugin_ctx = "hello world";
void plugin_session_ex_free(struct session *sess, uint8_t idx, void *ex_ptr, void *arg)
static void plugin_session_ex_free(struct session *sess, uint8_t idx, void *ex_ptr, void *arg)
{
EXPECT_STREQ((char *)ex_ptr, "123");
free(ex_ptr);
}
void plugin_init(void)
static void plugin_init(void)
{
plugin_ex = session_get_ex_new_index("plugin_ex", plugin_session_ex_free, NULL);
}
void plugin_dispatch(struct session *sess, uint32_t event, void *arg)
static void __session_dispatch(struct session *sess)
{
if (sess == NULL)
{
return;
}
printf("\n");
printf("=> plugin_dispatch handle session: %p, event: \"%s\", arg: %s\n", sess, session_event_tostring((enum session_event)event), (const char *)arg);
printf("=> session dispatch: %p\n", sess);
session_dump(sess);
if (event == SESSION_EVENT_OPENING)
char *ptr = (char *)session_get0_ex_data(sess, plugin_ex);
if (ptr == NULL)
{
char *pluin_ex = strdup("123");
session_set_ex_data(sess, plugin_ex, pluin_ex);
session_set_ex_data(sess, plugin_ex, strdup("123"));
}
else
{
char *pluin_ex = (char *)session_get0_ex_data(sess, plugin_ex);
EXPECT_STREQ(pluin_ex, "123");
EXPECT_STREQ(ptr, "123");
}
printf("<= plugin_dispatch\n");
printf("<= session dispatch\n");
printf("\n");
session_set0_cur_pkt(sess, NULL);
session_set_cur_dir(sess, SESSION_DIR_NONE);
}
/******************************************************************************
@@ -66,7 +73,6 @@ TEST(SESSION_MANAGER, INIT_TO_OPENING_BY_SYN)
mgr = session_manager_create(max_session_num);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_eventcb(mgr, plugin_dispatch, (void *)plugin_ctx);
session_manager_set_timeout_toclosing(mgr, 1000);
session_manager_set_timeout_toclosed(mgr, 2000);
@@ -101,11 +107,12 @@ TEST(SESSION_MANAGER, INIT_TO_OPENING_BY_SYN)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_CLOSING) == 0);
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
for (int i = 0; i < 4; i++)
{
timestamp_update();
session_manager_dispatch(mgr, session_manager_expire(mgr));
__session_dispatch(session_manager_expire(mgr));
__session_dispatch(session_manager_evicte(mgr));
sleep(1);
}
@@ -142,7 +149,6 @@ TEST(SESSION_MANAGER, INIT_TO_OPENING_BY_SYNACK)
mgr = session_manager_create(max_session_num);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_eventcb(mgr, plugin_dispatch, (void *)plugin_ctx);
session_manager_set_timeout_toclosing(mgr, 1000);
session_manager_set_timeout_toclosed(mgr, 2000);
@@ -177,11 +183,12 @@ TEST(SESSION_MANAGER, INIT_TO_OPENING_BY_SYNACK)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_CLOSING) == 0);
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
for (int i = 0; i < 4; i++)
{
timestamp_update();
session_manager_dispatch(mgr, session_manager_expire(mgr));
__session_dispatch(session_manager_expire(mgr));
__session_dispatch(session_manager_evicte(mgr));
sleep(1);
}
@@ -222,7 +229,6 @@ TEST(SESSION_MANAGER, INIT_TO_ACTIVE_BY_UDP_C2S)
mgr = session_manager_create(max_session_num);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_eventcb(mgr, plugin_dispatch, (void *)plugin_ctx);
session_manager_set_timeout_toclosing(mgr, 1000);
session_manager_set_timeout_toclosed(mgr, 2000);
@@ -257,11 +263,12 @@ TEST(SESSION_MANAGER, INIT_TO_ACTIVE_BY_UDP_C2S)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_CLOSING) == 0);
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 1);
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
for (int i = 0; i < 4; i++)
{
timestamp_update();
session_manager_dispatch(mgr, session_manager_expire(mgr));
__session_dispatch(session_manager_expire(mgr));
__session_dispatch(session_manager_evicte(mgr));
sleep(1);
}
@@ -298,7 +305,6 @@ TEST(SESSION_MANAGER, INIT_TO_ACTIVE_BY_UDP_S2C)
mgr = session_manager_create(max_session_num);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_eventcb(mgr, plugin_dispatch, (void *)plugin_ctx);
session_manager_set_timeout_toclosing(mgr, 1000);
session_manager_set_timeout_toclosed(mgr, 2000);
@@ -333,11 +339,12 @@ TEST(SESSION_MANAGER, INIT_TO_ACTIVE_BY_UDP_S2C)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_CLOSING) == 0);
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 1);
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
for (int i = 0; i < 4; i++)
{
timestamp_update();
session_manager_dispatch(mgr, session_manager_expire(mgr));
__session_dispatch(session_manager_expire(mgr));
__session_dispatch(session_manager_evicte(mgr));
sleep(1);
}
@@ -381,7 +388,6 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_C2S_PAYLOAD)
mgr = session_manager_create(max_session_num);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_eventcb(mgr, plugin_dispatch, (void *)plugin_ctx);
session_manager_set_timeout_toclosing(mgr, 1000);
session_manager_set_timeout_toclosed(mgr, 2000);
@@ -421,7 +427,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_C2S_PAYLOAD)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer SYN Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* c2s payload packet
@@ -459,7 +465,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_C2S_PAYLOAD)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer C2S Payload Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* timeout
@@ -469,7 +475,8 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_C2S_PAYLOAD)
for (int i = 0; i < 4; i++)
{
timestamp_update();
session_manager_dispatch(mgr, session_manager_expire(mgr));
__session_dispatch(session_manager_expire(mgr));
__session_dispatch(session_manager_evicte(mgr));
sleep(1);
}
@@ -509,7 +516,6 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_S2C_PAYLOAD)
mgr = session_manager_create(max_session_num);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_eventcb(mgr, plugin_dispatch, (void *)plugin_ctx);
session_manager_set_timeout_toclosing(mgr, 1000);
session_manager_set_timeout_toclosed(mgr, 2000);
@@ -549,7 +555,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_S2C_PAYLOAD)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer SYN Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* s2c payload packet
@@ -587,7 +593,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_S2C_PAYLOAD)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer C2S Payload Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* timeout
@@ -597,7 +603,8 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_S2C_PAYLOAD)
for (int i = 0; i < 4; i++)
{
timestamp_update();
session_manager_dispatch(mgr, session_manager_expire(mgr));
__session_dispatch(session_manager_expire(mgr));
__session_dispatch(session_manager_evicte(mgr));
sleep(1);
}
// check sess mgr
@@ -635,7 +642,6 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYNACK_AND_C2S_PAYLOAD)
mgr = session_manager_create(max_session_num);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_eventcb(mgr, plugin_dispatch, (void *)plugin_ctx);
session_manager_set_timeout_toclosing(mgr, 1000);
session_manager_set_timeout_toclosed(mgr, 2000);
@@ -675,7 +681,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYNACK_AND_C2S_PAYLOAD)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer SYNACK Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* c2s payload packet
@@ -713,7 +719,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYNACK_AND_C2S_PAYLOAD)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer C2S Payload Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* timeout
@@ -723,7 +729,8 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYNACK_AND_C2S_PAYLOAD)
for (int i = 0; i < 4; i++)
{
timestamp_update();
session_manager_dispatch(mgr, session_manager_expire(mgr));
__session_dispatch(session_manager_expire(mgr));
__session_dispatch(session_manager_evicte(mgr));
sleep(1);
}
@@ -762,7 +769,6 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYNACK_AND_S2C_PAYLOAD)
mgr = session_manager_create(max_session_num);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_eventcb(mgr, plugin_dispatch, (void *)plugin_ctx);
session_manager_set_timeout_toclosing(mgr, 1000);
session_manager_set_timeout_toclosed(mgr, 2000);
@@ -802,7 +808,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYNACK_AND_S2C_PAYLOAD)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer SYNACK Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* s2c payload packet
@@ -840,7 +846,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYNACK_AND_S2C_PAYLOAD)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer S2C Payload Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* timeout
@@ -850,7 +856,8 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYNACK_AND_S2C_PAYLOAD)
for (int i = 0; i < 4; i++)
{
timestamp_update();
session_manager_dispatch(mgr, session_manager_expire(mgr));
__session_dispatch(session_manager_expire(mgr));
__session_dispatch(session_manager_evicte(mgr));
sleep(1);
}
@@ -894,7 +901,6 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_SYNACK_AND_ACK_AND_C2S_PAYLOA
mgr = session_manager_create(max_session_num);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_eventcb(mgr, plugin_dispatch, (void *)plugin_ctx);
session_manager_set_timeout_toclosing(mgr, 1000);
session_manager_set_timeout_toclosed(mgr, 2000);
@@ -934,7 +940,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_SYNACK_AND_ACK_AND_C2S_PAYLOA
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer SYN Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* synack packet
@@ -972,7 +978,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_SYNACK_AND_ACK_AND_C2S_PAYLOA
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer SYNACK Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* ack packet
@@ -1010,7 +1016,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_SYNACK_AND_ACK_AND_C2S_PAYLOA
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer ACK Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* c2s payload packet
@@ -1048,7 +1054,7 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_SYNACK_AND_ACK_AND_C2S_PAYLOA
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer C2S TCP Payload Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* timeout
@@ -1058,7 +1064,8 @@ TEST(SESSION_MANAGER, OPENING_TO_ACTIVE_BY_SYN_AND_SYNACK_AND_ACK_AND_C2S_PAYLOA
for (int i = 0; i < 4; i++)
{
timestamp_update();
session_manager_dispatch(mgr, session_manager_expire(mgr));
__session_dispatch(session_manager_expire(mgr));
__session_dispatch(session_manager_evicte(mgr));
sleep(1);
}
// check sess mgr
@@ -1197,7 +1204,6 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
mgr = session_manager_create(max_session_num);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_eventcb(mgr, plugin_dispatch, (void *)plugin_ctx);
session_manager_set_timeout_toclosing(mgr, 1000);
session_manager_set_timeout_toclosed(mgr, 2000);
@@ -1237,7 +1243,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer SYN Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* synack packet
@@ -1275,7 +1281,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer SYNACK Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* ack packet
@@ -1313,7 +1319,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer ACK Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* c2s http req packet
@@ -1351,7 +1357,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer C2S HTTP Req Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* s2c ack packet
@@ -1389,7 +1395,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer S2C Ack Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* s2c http resp packet1
@@ -1427,7 +1433,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer S2C HTTP Resp Packet1 <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* s2c http resp packet2
@@ -1465,7 +1471,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer S2C HTTP Resp Packet2 <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* c2s ack packet
@@ -1503,7 +1509,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer C2S Ack Packet2 <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* c2s fin packet
@@ -1541,7 +1547,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer C2S FIN Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* s2c fin packet
@@ -1579,7 +1585,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer C2S FIN Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* c2s ack packet
@@ -1617,7 +1623,7 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 0);
printf("\n===> Atfer C2S ACK Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* timeout
@@ -1627,7 +1633,8 @@ TEST(SESSION_MANAGER, TCP_FULL_STREAM)
for (int i = 0; i < 4; i++)
{
timestamp_update();
session_manager_dispatch(mgr, session_manager_expire(mgr));
__session_dispatch(session_manager_expire(mgr));
__session_dispatch(session_manager_evicte(mgr));
sleep(1);
}
@@ -1666,7 +1673,6 @@ TEST(SESSION_MANAGER, UDP_FULL_STREAM)
mgr = session_manager_create(max_session_num);
EXPECT_TRUE(mgr != NULL);
session_manager_set_session_eventcb(mgr, plugin_dispatch, (void *)plugin_ctx);
session_manager_set_timeout_toclosing(mgr, 1000);
session_manager_set_timeout_toclosed(mgr, 2000);
@@ -1706,7 +1712,7 @@ TEST(SESSION_MANAGER, UDP_FULL_STREAM)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 1);
printf("\n===> Atfer UDP c2s Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* UDP s2c packet
@@ -1744,7 +1750,7 @@ TEST(SESSION_MANAGER, UDP_FULL_STREAM)
EXPECT_TRUE(session_manager_get_sessions(mgr, SESSION_TYPE_UDP, SESSION_STATE_ACTIVE) == 1);
printf("\n===> Atfer UDP c2s Packet <=== \n\n");
session_manager_dispatch(mgr, sess);
__session_dispatch(sess);
/**************************************************************************
* timeout
@@ -1753,7 +1759,8 @@ TEST(SESSION_MANAGER, UDP_FULL_STREAM)
for (int i = 0; i < 4; i++)
{
timestamp_update();
session_manager_dispatch(mgr, session_manager_expire(mgr));
__session_dispatch(session_manager_expire(mgr));
__session_dispatch(session_manager_evicte(mgr));
sleep(1);
}

View File

@@ -117,11 +117,20 @@ static void signal_handler(int signo)
}
}
static void plugin_dispatch(struct session *sess, uint32_t event, void *arg)
static void __session_dispatch(struct session *sess)
{
struct thread_ctx *thd_ctx = (struct thread_ctx *)arg;
printf("=> tid[%d] plugin_dispatch handle session: %p, event: \"%s\"\n", thd_ctx->index, sess, session_event_tostring((enum session_event)event));
if (sess == NULL)
{
return;
}
printf("\n");
printf("=> session dispatch: %p\n", sess);
session_dump(sess);
printf("<= session dispatch\n");
session_set0_cur_pkt(sess, NULL);
session_set_cur_dir(sess, SESSION_DIR_NONE);
}
static void thread_ctx_init(struct stellar_ctx *ctx)
@@ -135,7 +144,6 @@ static void thread_ctx_init(struct stellar_ctx *ctx)
// session manager
thd_ctx->sess_mgr = session_manager_create(ctx->sess_mgr_max_session_num);
assert(thd_ctx->sess_mgr != NULL);
session_manager_set_session_eventcb(thd_ctx->sess_mgr, plugin_dispatch, thd_ctx);
session_manager_set_timeout_toclosing(thd_ctx->sess_mgr, ctx->sess_mgr_timeout_ms_toclsoing);
session_manager_set_timeout_toclosed(thd_ctx->sess_mgr, ctx->sess_mgr_timeout_ms_toclosed);
// duplicated packet filter
@@ -206,7 +214,7 @@ static void *thread_cycle(void *arg)
// TODO session synchronization
session_manager_dispatch(sess_mgr, sess);
__session_dispatch(sess);
dupkt_filter_add(dupkt_filter, &pkt);
fast_forward:
@@ -219,7 +227,15 @@ static void *thread_cycle(void *arg)
const struct packet *sess_1st_pkt = session_get0_1st_pkt(sess);
eviction_filter_add(thd_ctx->eviction_filter, sess_1st_pkt);
}
session_manager_dispatch(sess_mgr, sess);
__session_dispatch(sess);
sess = session_manager_evicte(sess_mgr);
if (sess && session_get_type(sess) == SESSION_TYPE_UDP)
{
const struct packet *sess_1st_pkt = session_get0_1st_pkt(sess);
eviction_filter_add(thd_ctx->eviction_filter, sess_1st_pkt);
}
__session_dispatch(sess);
// TODO get next timeout
sleep(1);