Add stellar thread cycle

This commit is contained in:
luwenpeng
2024-01-09 18:03:24 +08:00
parent 6fda3e9ddb
commit 37f4680dbc
12 changed files with 535 additions and 311 deletions

View File

@@ -23,8 +23,8 @@ struct session_manager
void *arg;
// timeout config
uint64_t timeout_to_closing;
uint64_t timeout_to_closed;
uint64_t timeout_toclosing;
uint64_t timeout_toclosed;
// session number
uint64_t tcp_opening_sess_num;
@@ -201,7 +201,7 @@ void session_manager_update_session_timer(struct session_manager *mgr, struct se
static void session_to_closed(struct session *sess, void *arg)
{
SESSION_MANAGER_LOG_DEBUG("session %lu closing expire, free session", session_get_id(sess));
SESSION_LOG_DEBUG("session %lu closing expire, free session", session_get_id(sess));
struct session_manager *mgr = (struct session_manager *)arg;
assert(mgr != NULL);
@@ -225,14 +225,14 @@ static void session_to_closed(struct session *sess, void *arg)
static void session_to_closing(struct session *sess, void *arg)
{
SESSION_MANAGER_LOG_DEBUG("session %lu packet expire, trigger closing event", session_get_id(sess));
SESSION_LOG_DEBUG("session %lu packet expire, trigger closing event", session_get_id(sess));
struct session_manager *mgr = (struct session_manager *)arg;
assert(mgr != NULL);
update_counter_on_closing(mgr, sess);
session_set_state(sess, SESSION_STATE_CLOSING);
session_push_event(sess, SESSION_EVENT_CLOSING);
session_manager_update_session_timer(mgr, sess, session_to_closed, mgr->timeout_to_closed);
session_manager_update_session_timer(mgr, sess, session_to_closed, mgr->timeout_toclosed);
}
/******************************************************************************
@@ -428,7 +428,7 @@ static int handle_tcp_new_session(struct session_manager *mgr, struct tuple6 *ke
session_push_event(sess, SESSION_EVENT_OPENING);
session_push_event(sess, SESSION_EVENT_PACKET);
session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_to_closing);
session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_toclosing);
return 0;
}
@@ -460,7 +460,7 @@ static int handle_udp_new_session(struct session_manager *mgr, struct tuple6 *ke
session_push_event(sess, SESSION_EVENT_OPENING);
session_push_event(sess, SESSION_EVENT_PACKET);
session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_to_closing);
session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_toclosing);
return 0;
}
@@ -478,7 +478,7 @@ static void handle_tcp_old_session(struct session_manager *mgr, struct tuple6 *k
session_set_state(sess, SESSION_STATE_CLOSING);
session_push_event(sess, SESSION_EVENT_PACKET);
session_push_event(sess, SESSION_EVENT_CLOSING);
session_manager_update_session_timer(mgr, sess, session_to_closed, mgr->timeout_to_closed);
session_manager_update_session_timer(mgr, sess, session_to_closed, mgr->timeout_toclosed);
return;
}
@@ -487,7 +487,7 @@ static void handle_tcp_old_session(struct session_manager *mgr, struct tuple6 *k
update_counter_on_active(mgr, sess);
session_set_state(sess, SESSION_STATE_ACTIVE);
session_push_event(sess, SESSION_EVENT_PACKET);
session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_to_closing);
session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_toclosing);
return;
}
}
@@ -501,7 +501,7 @@ static void handle_udp_old_session(struct session_manager *mgr, struct tuple6 *k
update_counter_on_active(mgr, sess);
session_set_state(sess, SESSION_STATE_ACTIVE);
session_push_event(sess, SESSION_EVENT_PACKET);
session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_to_closing);
session_manager_update_session_timer(mgr, sess, session_to_closing, mgr->timeout_toclosing);
}
// return 0: success
@@ -571,8 +571,8 @@ struct session_manager *session_manager_create(uint64_t max_session_num)
goto error;
}
mgr->timeout_to_closed = 2 * 1000;
mgr->timeout_to_closing = 5 * 1000;
mgr->timeout_toclosed = 2 * 1000;
mgr->timeout_toclosing = 5 * 1000;
mgr->tcp_opening_sess_num = 0;
mgr->tcp_closing_sess_num = 0;
@@ -617,12 +617,12 @@ void session_manager_set_session_eventcb(struct session_manager *mgr, session_ev
void session_manager_set_timeout_toclosing(struct session_manager *mgr, uint64_t timeout_ms)
{
mgr->timeout_to_closing = timeout_ms;
mgr->timeout_toclosing = timeout_ms;
}
void session_manager_set_timeout_toclosed(struct session_manager *mgr, uint64_t timeout_ms)
{
mgr->timeout_to_closed = timeout_ms;
mgr->timeout_toclosed = timeout_ms;
}
struct session *session_manager_lookup(struct session_manager *mgr, const struct packet *pkt)
@@ -636,7 +636,7 @@ struct session *session_manager_lookup(struct session_manager *mgr, const struct
return session_table_find_session(mgr->sess_table, &key);
}
// return null: Invalid tuple6 or tcp first packet is not syn
// return null: invalid tuple6 or tcp first packet is not syn
struct session *session_manager_update(struct session_manager *mgr, const struct packet *pkt)
{
struct tuple6 key;
@@ -657,7 +657,7 @@ struct session *session_manager_update(struct session_manager *mgr, const struct
session_set_state(unused_sess, SESSION_STATE_CLOSING);
session_push_event(unused_sess, SESSION_EVENT_CLOSING);
session_queue_push(mgr->evicted_sess, unused_sess);
session_manager_update_session_timer(mgr, unused_sess, session_to_closed, mgr->timeout_to_closed);
session_manager_update_session_timer(mgr, unused_sess, session_to_closed, mgr->timeout_toclosed);
}
sess = session_pool_alloc(mgr->sess_pool);
@@ -685,7 +685,7 @@ struct session *session_manager_update(struct session_manager *mgr, const struct
struct session *session_manager_expire(struct session_manager *mgr)
{
SESSION_MANAGER_LOG_DEBUG("current timestamp: %lu s", timestamp_get_sec());
SESSION_LOG_DEBUG("current timestamp: %lu s", timestamp_get_sec());
struct session *sess = session_timer_expire_session(mgr->sess_timer, timestamp_get_msec());
if (sess == NULL)
{
@@ -712,7 +712,7 @@ static void session_dispatch(struct session_manager *mgr, struct session *sess)
uint32_t event;
while (session_pop_event(sess, &event))
{
SESSION_MANAGER_LOG_DEBUG("handle \"%s\" event on session %lu", session_event_tostring((enum session_event)event), session_get_id(sess));
SESSION_LOG_DEBUG("handle \"%s\" event on session %lu", session_event_tostring((enum session_event)event), session_get_id(sess));
if (mgr->event_cb)
{
mgr->event_cb(sess, event, mgr->arg);
@@ -732,32 +732,37 @@ void session_manager_dispatch(struct session_manager *mgr, struct session *sess)
session_dispatch(mgr, evicted_sess);
}
uint64_t session_manager_get_tcp_opening_sess_num(struct session_manager *mgr)
uint64_t session_manager_get_sessions(struct session_manager *mgr, enum session_type type, enum session_state state)
{
return mgr->tcp_opening_sess_num;
}
if (type == SESSION_TYPE_TCP)
{
switch (state)
{
case SESSION_STATE_OPENING:
return mgr->tcp_opening_sess_num;
case SESSION_STATE_ACTIVE:
return mgr->tcp_active_sess_num;
case SESSION_STATE_CLOSING:
return mgr->tcp_closing_sess_num;
default:
return 0;
}
}
uint64_t session_manager_get_tcp_closing_sess_num(struct session_manager *mgr)
{
return mgr->tcp_closing_sess_num;
}
if (type == SESSION_TYPE_UDP)
{
switch (state)
{
case SESSION_STATE_OPENING:
return mgr->udp_opening_sess_num;
case SESSION_STATE_ACTIVE:
return mgr->udp_active_sess_num;
case SESSION_STATE_CLOSING:
return mgr->udp_closing_sess_num;
default:
return 0;
}
}
uint64_t session_manager_get_tcp_active_sess_num(struct session_manager *mgr)
{
return mgr->tcp_active_sess_num;
}
uint64_t session_manager_get_udp_opening_sess_num(struct session_manager *mgr)
{
return mgr->udp_opening_sess_num;
}
uint64_t session_manager_get_udp_closing_sess_num(struct session_manager *mgr)
{
return mgr->udp_closing_sess_num;
}
uint64_t session_manager_get_udp_active_sess_num(struct session_manager *mgr)
{
return mgr->udp_active_sess_num;
return 0;
}