Add state to the on_session_message parameter.

* When state is closed, it implies that packet is null and the session will be destroyed
This commit is contained in:
luwenpeng
2024-10-25 19:15:28 +08:00
parent 4061d5a942
commit 03864c9731
9 changed files with 168 additions and 128 deletions

View File

@@ -21,7 +21,6 @@ struct session_manager_schema
int pkt_exdata_idx;
int topic_id_free;
int topic_id_tcp;
int topic_id_udp;
int topic_id_ctrl_pkt;
@@ -40,28 +39,25 @@ struct session_manager
* callback
******************************************************************************/
static void on_session_free_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
{
struct session *sess = (struct session *)msg;
((on_session_free_callback *)(void *)cb)(sess, cb_args);
}
static void on_session_packet_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
static void on_session_message_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
{
struct session *sess = (struct session *)msg;
struct packet *pkt = (struct packet *)session_get0_current_packet(sess);
enum session_state state = session_get_current_state(sess);
((on_session_packet_callback *)(void *)cb)(sess, pkt, cb_args);
((on_session_message_callback *)(void *)cb)(sess, state, pkt, cb_args);
}
static void on_tcp_stream_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
static void on_tcp_payload_message_dispatch(int topic_id, void *msg, on_msg_cb_func *cb, void *cb_args, void *dispatch_args)
{
struct tcp_segment *seg = (struct tcp_segment *)msg;
struct session *sess = (struct session *)seg->user_data;
enum session_state state = session_get_current_state(sess);
((on_tcp_stream_callback *)(void *)cb)(seg->user_data, seg->data, seg->len, cb_args);
((on_tcp_payload_callback *)(void *)cb)(sess, state, seg->data, seg->len, cb_args);
}
static void on_tcp_stream_free(void *msg, void *args)
static void on_tcp_payload_message_free(void *msg, void *args)
{
struct tcp_segment *seg = (struct tcp_segment *)msg;
struct session *sess = (struct session *)seg->user_data;
@@ -69,29 +65,31 @@ static void on_tcp_stream_free(void *msg, void *args)
session_free_tcp_segment(sess, seg);
}
static void on_session_free(void *msg, void *args)
static void on_session_message_free(void *msg, void *args)
{
struct session *sess = (struct session *)msg;
struct session_manager *sess_mgr = (struct session_manager *)args;
struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
char buffer[4096] = {0};
session_to_str(sess, 0, buffer, sizeof(buffer));
SESSION_MANAGER_LOG_INFO("session free: %s", buffer);
if (session_get_current_state(sess) == SESSION_STATE_CLOSED)
{
struct session_manager *sess_mgr = (struct session_manager *)args;
int thread_id = stellar_module_manager_get_thread_id(sess_mgr->mod_mgr);
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
struct exdata_runtime *exdata_rt = (struct exdata_runtime *)session_get_user_data(sess);
exdata_runtime_free(exdata_rt);
session_manager_runtime_free_session(sess_mgr_rt, sess);
char buffer[4096] = {0};
session_to_str(sess, 0, buffer, sizeof(buffer));
SESSION_MANAGER_LOG_INFO("session free: %s", buffer);
struct exdata_runtime *exdata_rt = (struct exdata_runtime *)session_get_user_data(sess);
exdata_runtime_free(exdata_rt);
session_manager_runtime_free_session(sess_mgr_rt, sess);
}
}
static void on_packet_forward(struct packet *pkt, enum packet_stage stage, void *args)
{
struct session_manager *sess_mgr = (struct session_manager *)args;
struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr);
int thread_id = stellar_module_manager_get_thread_id(sess_mgr->mod_mgr);
struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(sess_mgr->mod_mgr);
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
/*
@@ -181,8 +179,7 @@ fast_path:
static void on_packet_output(struct packet *pkt, enum packet_stage stage, void *args)
{
struct session_manager *sess_mgr = (struct session_manager *)args;
struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
int thread_id = stellar_module_manager_get_thread_id(sess_mgr->mod_mgr);
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
struct session *sess = (struct session *)packet_get_exdata(pkt, sess_mgr->schema->pkt_exdata_idx);
@@ -227,24 +224,44 @@ static void on_packet_output(struct packet *pkt, enum packet_stage stage, void *
}
}
static void on_polling(struct stellar_module_manager *mod_mgr, void *args)
static void publish_session_closed_message_batch(struct session_manager *sess_mgr, uint16_t thread_id, uint64_t now_ms)
{
uint64_t now_ms = clock_get_real_time_ms();
struct session *sess = NULL;
struct session *cleaned[MAX_CLEANED_SESS] = {NULL};
struct session_manager *sess_mgr = (struct session_manager *)args;
int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr);
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(sess_mgr->mod_mgr);
uint64_t used = session_manager_runtime_clean_session(sess_mgr_rt, now_ms, cleaned, MAX_CLEANED_SESS);
for (uint64_t i = 0; i < used; i++)
{
mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_free, cleaned[i]);
}
sess = cleaned[i];
assert(session_get_current_state(sess) == SESSION_STATE_CLOSED);
session_set_current_packet(sess, NULL);
session_set_flow_type(sess, FLOW_TYPE_NONE);
if (session_get_type(sess) == SESSION_TYPE_TCP)
{
mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_tcp_stream, &sess->empty_seg);
mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_ctrl_pkt, sess);
mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_tcp, sess);
}
else
{
mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_ctrl_pkt, sess);
mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_udp, sess);
}
}
}
static void on_polling(struct stellar_module_manager *mod_mgr, void *args)
{
uint64_t now_ms = clock_get_real_time_ms();
struct session_manager *sess_mgr = (struct session_manager *)args;
int thread_id = stellar_module_manager_get_thread_id(mod_mgr);
publish_session_closed_message_batch(sess_mgr, thread_id, now_ms);
// TODO
// ouput stat to fs4
// session_manager_runtime_print_stat(sess_mgr_rt);
}
/******************************************************************************
@@ -257,7 +274,6 @@ void session_manager_schema_free(struct session_manager_schema *sess_mgr_schema)
{
if (sess_mgr_schema->mq)
{
mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_free);
mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_tcp);
mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_udp);
mq_schema_destroy_topic(sess_mgr_schema->mq, sess_mgr_schema->topic_id_ctrl_pkt);
@@ -270,14 +286,14 @@ void session_manager_schema_free(struct session_manager_schema *sess_mgr_schema)
}
}
struct session_manager_schema *session_manager_schema_new(struct packet_manager *pkt_mgr, struct mq_schema *mq, void *subscribe_args)
struct session_manager_schema *session_manager_schema_new(struct packet_manager *pkt_mgr, struct mq_schema *mq, void *sess_mgr)
{
if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, subscribe_args))
if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, sess_mgr))
{
SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_FORWARD");
return NULL;
}
if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, subscribe_args))
if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, sess_mgr))
{
SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_OUTPUT");
return NULL;
@@ -305,31 +321,34 @@ struct session_manager_schema *session_manager_schema_new(struct packet_manager
goto error_out;
}
sess_mgr_schema->topic_id_free = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_FREE", &on_session_free_dispatch, NULL, &on_session_free, subscribe_args);
if (sess_mgr_schema->topic_id_free == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_FREE");
goto error_out;
}
sess_mgr_schema->topic_id_tcp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP", &on_session_packet_dispatch, NULL, NULL, NULL);
/*
* Publish session closed messages to multiple topics.
* Each topic has its own session message free callback.
* To prevent the same session from being freeed multiple times,
* only TCP/UDP topics register session message free callbacks,
* and other topics do not register session message callbacks;
*
* Restriction: MQ ensures that the session message free order is consistent with the publishing order
*/
sess_mgr_schema->topic_id_tcp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP", &on_session_message_dispatch, NULL, &on_session_message_free, sess_mgr);
if (sess_mgr_schema->topic_id_tcp == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_FREE");
goto error_out;
}
sess_mgr_schema->topic_id_udp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_UDP", &on_session_packet_dispatch, NULL, NULL, NULL);
sess_mgr_schema->topic_id_udp = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_UDP", &on_session_message_dispatch, NULL, &on_session_message_free, sess_mgr);
if (sess_mgr_schema->topic_id_udp == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_UDP");
goto error_out;
}
sess_mgr_schema->topic_id_ctrl_pkt = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_CTRL_PKT", &on_session_packet_dispatch, NULL, NULL, NULL);
sess_mgr_schema->topic_id_ctrl_pkt = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_CTRL_PKT", &on_session_message_dispatch, NULL, NULL, NULL);
if (sess_mgr_schema->topic_id_ctrl_pkt == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_CTRL_PKT");
goto error_out;
}
sess_mgr_schema->topic_id_tcp_stream = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP_STREAM", &on_tcp_stream_dispatch, NULL, &on_tcp_stream_free, NULL);
sess_mgr_schema->topic_id_tcp_stream = mq_schema_create_topic(sess_mgr_schema->mq, "SESSIOM_MANAGER_TOPIC_TCP_STREAM", &on_tcp_payload_message_dispatch, NULL, &on_tcp_payload_message_free, sess_mgr);
if (sess_mgr_schema->topic_id_tcp_stream == -1)
{
SESSION_MANAGER_LOG_ERROR("failed to create topic SESSIOM_MANAGER_TOPIC_TCP_STREAM");
@@ -397,15 +416,7 @@ int session_manager_new_session_exdata_index(struct session_manager *sess_mgr, c
return exdata_schema_new_index(sess_mgr->schema->exdata, name, func, arg);
}
int session_manager_subscribe_free(struct session_manager *sess_mgr, on_session_free_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_free, (on_msg_cb_func *)(void *)cb, args);
}
int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args)
int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
@@ -413,7 +424,7 @@ int session_manager_subscribe_tcp(struct session_manager *sess_mgr, on_session_p
return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_tcp, (on_msg_cb_func *)(void *)cb, args);
}
int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args)
int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
@@ -421,7 +432,7 @@ int session_manager_subscribe_udp(struct session_manager *sess_mgr, on_session_p
return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_udp, (on_msg_cb_func *)(void *)cb, args);
}
int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_packet_callback *cb, void *args)
int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, on_session_message_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
@@ -429,7 +440,7 @@ int session_manager_subscribe_control_packet(struct session_manager *sess_mgr, o
return mq_schema_subscribe(sess_mgr->schema->mq, sess_mgr->schema->topic_id_ctrl_pkt, (on_msg_cb_func *)(void *)cb, args);
}
int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_stream_callback *cb, void *args)
int session_manager_subscribe_tcp_stream(struct session_manager *sess_mgr, on_tcp_payload_callback *cb, void *args)
{
assert(sess_mgr);
assert(cb);
@@ -462,8 +473,7 @@ void session_manager_clean(struct session_manager *sess_mgr, uint16_t thread_id)
assert(sess_mgr);
assert(thread_id < sess_mgr->cfg->thread_num);
struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr;
struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr);
struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(sess_mgr->mod_mgr);
struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id];
if (sess_mgr_rt == NULL)
{
@@ -473,13 +483,9 @@ void session_manager_clean(struct session_manager *sess_mgr, uint16_t thread_id)
struct session_manager_stat *stat = session_manager_runtime_get_stat(sess_mgr_rt);
while (stat->tcp_sess_used || stat->udp_sess_used)
{
struct session *cleaned[MAX_CLEANED_SESS] = {NULL};
uint64_t used = session_manager_runtime_clean_session(sess_mgr_rt, UINT64_MAX, cleaned, MAX_CLEANED_SESS);
for (uint64_t i = 0; i < used; i++)
{
mq_runtime_publish_message(mq_rt, sess_mgr->schema->topic_id_free, cleaned[i]);
mq_runtime_dispatch(mq_rt);
}
publish_session_closed_message_batch(sess_mgr, thread_id, UINT64_MAX);
// here we need to dispatch the message to ensure that the session is cleaned up
mq_runtime_dispatch(mq_rt);
}
SESSION_MANAGER_LOG_INFO("runtime: %p, idx: %d, will be cleaned", sess_mgr_rt, thread_id);