Refactor(plug_mgr API): remove session_ctx, provide stellar_mq

This commit is contained in:
yangwei
2024-09-04 20:46:18 +08:00
parent 5373efdbff
commit 6e0b13f3d6
10 changed files with 320 additions and 977 deletions

View File

@@ -878,7 +878,7 @@ static void http_decoder_on_session_free(struct session *sess,void *per_session_
goto failed;
}
httpd_env->st = st;
httpd_env->plugin_id = stellar_session_plugin_register_with_hooks(st, httpd_session_ctx_new_cb,
httpd_env->plugin_id = stellar_plugin_register(st, httpd_session_ctx_new_cb,
httpd_session_ctx_free_cb, NULL,http_decoder_on_session_free,(void *)httpd_env);
if (httpd_env->plugin_id < 0)
{

View File

@@ -7,57 +7,28 @@ extern "C"
#include <stdint.h>
struct session;
struct stellar;
//return plugin_env
typedef void *plugin_on_load_func(struct stellar *st);
typedef void plugin_on_unload_func(void *plugin_env);
//return per_session_ctx
typedef void *session_ctx_new_func(struct session *sess, void *plugin_env);
typedef void session_ctx_free_func(struct session *sess, void *session_ctx, void *plugin_env);
typedef void on_session_new_func(struct session *sess, void *session_ctx, void *plugin_env);
typedef void on_session_free_func(struct session *sess, void *session_ctx, void *plugin_env);
// INTRINSIC TOPIC
// TOPIC_TCP_STREAM on_msg need convert msg to (const struct tcp_segment *)
// TOPIC_UDP_INPUT/TOPIC_TCP_INPUT/TOPIC_UDP_OUTPUT/TOPIC_TCP_OUTPUT/TOPIC_CONTROL_PACKET on_msg need convert msg to (const struct packet *)
#define TOPIC_TCP_STREAM "TCP_STREAM" //topic message: tcp_segment
#define TOPIC_CONTROL_PACKET "CONTROL_PACKET" //topic message: packet
#define TOPIC_TCP_INPUT "TCP_INPUT" //topic message: packet
#define TOPIC_TCP_OUTPUT "TCP_OUTPUT" //topic message: packet
#define TOPIC_UDP_INPUT "UDP_INPUT" //topic message: packet
#define TOPIC_UDP_OUTPUT "UDP_OUTPUT" //topic message: packet
//return session plugin_id
int stellar_session_plugin_register(struct stellar *st,
session_ctx_new_func session_ctx_new,
session_ctx_free_func session_ctx_free,
void *plugin_env);
int stellar_session_plugin_register_with_hooks(struct stellar *st,
session_ctx_new_func session_ctx_new,
session_ctx_free_func session_ctx_free,
on_session_new_func on_session_new,
on_session_free_func on_session_free,
void *plugin_env);
void stellar_session_plugin_dettach_current_session(struct session *sess);
struct tcp_segment;
const char *tcp_segment_get_data(const struct tcp_segment *seg);
uint16_t tcp_segment_get_len(const struct tcp_segment *seg);
#define TOPIC_TCP_STREAM "TCP_STREAM" //topic message: tcp_segment
#define TOPIC_CONTROL_PACKET "CONTROL_PACKET" //topic message: packet
#define TOPIC_TCP "TCP" //topic message: session
#define TOPIC_UDP "UDP" //topic message: session
struct packet;
typedef void plugin_on_packet_func(struct packet *pkt, unsigned char ip_protocol, void *plugin_env);
//return packet plugin_id
int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env);
//return plugin_id
int stellar_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env);
//return polling work result, 0: no work, 1: work

View File

@@ -16,12 +16,15 @@ inline static void stellar_exdata_free_default(int idx __unused, void *ex_ptr, v
if(ex_ptr)FREE(ex_ptr);
}
struct packet;
int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *arg);
//packet exdata api
int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr);
void *packet_exdata_get(struct packet *pkt, int idx);
struct session;
//session exdata api
int session_exdata_set(struct session *sess, int idx, void *ex_ptr);
void *session_exdata_get(struct session *sess, int idx);

View File

@@ -31,27 +31,11 @@ enum stellar_mq_priority
STELLAR_MQ_PRIORITY_MAX,
};
//session mq api
typedef void on_session_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env);
typedef void on_msg_cb_func(int topic_id, const void *msg, void *plugin_env);
//return 0 if success, otherwise return -1.
int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id);
int session_mq_publish_message(struct session *sess, int topic_id, void *msg);
int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *msg, enum stellar_mq_priority priority);
int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id);
int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id);
int session_mq_topic_is_active(struct session *sess, int topic_id);
//packet mq api
typedef void on_packet_msg_cb_func(struct packet *pkt, int topic_id, const void *msg, void *plugin_env);
//return 0 if success, otherwise return -1.
int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id); //packet plugin only
int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg);
int packet_mq_publish_message_with_priority(struct packet *pkt, int topic_id, void *msg, enum stellar_mq_priority priority);
int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id);
int stellar_mq_publish_message(struct stellar *st, int topic_id, void *msg);
int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, void *msg, enum stellar_mq_priority priority);
#ifdef __cplusplus
}

View File

@@ -13,17 +13,6 @@
UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
inline static void plugin_manager_scratch_session_set(struct plugin_manager_schema *plug_mgr, int tid, struct session *sess)
{
plug_mgr->per_thread_data[tid].thread_scratch_session = sess;
}
inline static struct session *plugin_manager_scratch_session_get(struct plugin_manager_schema *plug_mgr, int tid)
{
return plug_mgr->per_thread_data[tid].thread_scratch_session;
}
static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num)
{
*spec_num = 0;
@@ -104,13 +93,6 @@ static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread
return;
}
static void tcp_stream_msg_free_fn(void *msg, void *msg_free_arg __attribute__((unused)))
{
struct plugin_manager_schema *plug_mgr=(struct plugin_manager_schema *)msg_free_arg;
struct session *cur_sess = plugin_manager_scratch_session_get(plug_mgr, stellar_get_current_thread_index());
if(msg && cur_sess)session_free_tcp_segment(cur_sess, (struct tcp_segment *)msg);
}
struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path)
{
int spec_num;
@@ -131,14 +113,6 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
plug_mgr->st = st;
stellar_set_plugin_manger(st, plug_mgr);
plug_mgr->tcp_input_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_INPUT, NULL, NULL);
plug_mgr->tcp_output_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_OUTPUT, NULL, NULL);
plug_mgr->tcp_stream_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_STREAM, tcp_stream_msg_free_fn, plug_mgr);
plug_mgr->udp_input_topic_id=stellar_mq_create_topic(st, TOPIC_UDP_INPUT, NULL, NULL);
plug_mgr->udp_output_topic_id=stellar_mq_create_topic(st, TOPIC_UDP_OUTPUT, NULL, NULL);
plug_mgr->control_packet_topic_id=stellar_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL);
for(int i = 0; i < spec_num; i++)
{
if (specs[i].load_cb != NULL)
@@ -184,15 +158,6 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->registered_packet_plugin_array);
}
if(plug_mgr->registered_session_plugin_array)
{
struct registered_session_plugin_schema *s = NULL;
while ((s = (struct registered_session_plugin_schema *)utarray_next(plug_mgr->registered_session_plugin_array, s)))
{
if(s->registed_session_mq_subscriber_info)utarray_free(s->registed_session_mq_subscriber_info);
}
utarray_free(plug_mgr->registered_session_plugin_array);
}
plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
FREE(plug_mgr);
return;
@@ -328,18 +293,18 @@ void *packet_exdata_get(struct packet *pkt, int idx)
int session_exdata_set(struct session *sess, int idx, void *ex_ptr)
{
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt == NULL)return -1;
if(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array == NULL)return -1;
return stellar_exdata_set(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array, plug_mgr_rt->sess_exdata_array, idx, ex_ptr);
struct stellar_exdata *sess_exdata_array = (struct stellar_exdata *)session_get_user_data(sess);
if(sess_exdata_array == NULL)return -1;
if(sess_exdata_array->plug_mgr->stellar_exdata_schema_array == NULL)return -1;
return stellar_exdata_set(sess_exdata_array->plug_mgr->stellar_exdata_schema_array, sess_exdata_array, idx, ex_ptr);
}
void *session_exdata_get(struct session *sess, int idx)
{
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt == NULL)return NULL;
if(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array==NULL)return NULL;
return stellar_exdata_get(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array, plug_mgr_rt->sess_exdata_array, idx);
struct stellar_exdata *sess_exdata_array = (struct stellar_exdata *)session_get_user_data(sess);
if(sess_exdata_array == NULL)return NULL;
if(sess_exdata_array->plug_mgr->stellar_exdata_schema_array==NULL)return NULL;
return stellar_exdata_get(sess_exdata_array->plug_mgr->stellar_exdata_schema_array, sess_exdata_array, idx);
}
/*******************************
@@ -450,23 +415,9 @@ int stellar_mq_destroy_topic(struct stellar *st, int topic_id)
return 1; // success
}
static int stellar_mq_publish_message(enum stellar_topic_type type, int topic_id, void *data, UT_array *stellar_mq_schema, struct stellar_message *priority_mq[], enum stellar_mq_priority priority)
{
if(stellar_mq_schema==NULL || topic_id < 0)return -1;
unsigned int len = utarray_len(stellar_mq_schema);
if (len <= (unsigned int)topic_id)return -1;
struct stellar_message *msg= CALLOC(struct stellar_message,1);
msg->header.topic_id = topic_id;
msg->header.type=type;
msg->header.priority = priority;
msg->body = data;
DL_APPEND(priority_mq[priority], msg);
return 0;
}
UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL};
static int stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topic_id, void *plugin_on_msg_cb, int plugin_idx, UT_array *registed_mq_subscriber_info)
static int __stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topic_id, void *plugin_on_msg_cb, int plugin_idx, UT_array *registed_mq_subscriber_info)
{
if(plug_mgr == NULL || plug_mgr->stellar_mq_schema_array==NULL || registed_mq_subscriber_info == NULL)return -1;
@@ -488,7 +439,7 @@ static int stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topi
{
if(cnt==p->subscriber_idx)
{
tmp_subscriber->msg_cb=plugin_on_msg_cb;
tmp_subscriber->plugin_msg_cb=plugin_on_msg_cb;
return 0;
}
cnt++;
@@ -500,7 +451,7 @@ static int stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topi
struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
new_subscriber->plugin_idx = plugin_idx;
new_subscriber->msg_cb = plugin_on_msg_cb;
new_subscriber->plugin_msg_cb = plugin_on_msg_cb;
DL_APPEND(topic->subscribers, new_subscriber);
struct stellar_mq_subscriber_info sub_info;
@@ -509,74 +460,13 @@ static int stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topi
sub_info.subscriber_idx=topic->subscriber_cnt;
utarray_push_back(registed_mq_subscriber_info, &sub_info);
topic->subscriber_cnt+=1;
plug_mgr->session_topic_subscriber_num+=1;
plug_mgr->mq_topic_subscriber_num+=1;
return 0;
}
static void plugin_manager_runtime_update_plugin_ctx(struct session *sess, struct registered_session_plugin_schema *session_plugin_schema, struct session_plugin_ctx_runtime *plugin_ctx_rt)
static void stellar_mq_dispatch_one_message(struct stellar_message *mq_elt)
{
if(sess==NULL || session_plugin_schema == NULL || plugin_ctx_rt == NULL)return;
if (plugin_ctx_rt->state == INIT)
{
if (session_plugin_schema->on_ctx_new)
{
plugin_ctx_rt->plugin_ctx = session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
if (plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free)
{
session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
plugin_ctx_rt->plugin_ctx = NULL;
}
else
{
plugin_ctx_rt->state = ACTIVE;
}
}
}
}
static void stellar_mq_dispatch_one_session_message(struct session *sess, struct stellar_message *mq_elt)
{
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
struct registered_session_plugin_schema *session_plugin_schema;
struct session_plugin_ctx_runtime *plugin_ctx_rt;
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array,
(unsigned int)(mq_elt->header.topic_id));
if (topic)
{
int cur_sub_idx = 0;
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
plug_mgr_rt->current_session_plugin_id = sub_elt->plugin_idx;
if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)
{
plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + sub_elt->plugin_idx);
session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(
plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
if (session_plugin_schema)
{
plugin_manager_runtime_update_plugin_ctx(sess, session_plugin_schema, plugin_ctx_rt);
if (sub_elt->sess_msg_cb &&
bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) !=
0) // ctx_new maybe call detach, need check again
{
sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx,
session_plugin_schema->plugin_env);
}
}
}
cur_sub_idx++;
}
if (cur_sub_idx == 0)
bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0);
}
}
static void stellar_mq_dispatch_one_packet_message(struct packet *pkt, struct stellar_message *mq_elt)
{
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)stellar_get_plugin_manager(mq_elt->st);
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
struct registered_packet_plugin_schema *packet_plugin_schema;
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,
@@ -585,20 +475,20 @@ static void stellar_mq_dispatch_one_packet_message(struct packet *pkt, struct st
{
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
if (sub_elt->pkt_msg_cb)
if (sub_elt->plugin_msg_cb)
{
packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(
plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
if (packet_plugin_schema)
{
sub_elt->pkt_msg_cb(pkt, mq_elt->header.topic_id, mq_elt->body, packet_plugin_schema->plugin_env);
sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, packet_plugin_schema->plugin_env);
}
}
}
}
}
static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct stellar_message ** dealth_letter_queue, struct session *sess, struct packet *pkt)
static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct stellar_message ** dealth_letter_queue)
{
struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
int cur_priority = STELLAR_MQ_PRIORITY_HIGH;
@@ -611,8 +501,7 @@ static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct st
}
DL_FOREACH_SAFE(priority_mq[cur_priority], mq_elt, mq_tmp)
{
if(mq_elt->header.type==ON_SESSION_TOPIC)stellar_mq_dispatch_one_session_message(sess, mq_elt);
if(mq_elt->header.type==ON_PACKET_TOPIC)stellar_mq_dispatch_one_packet_message(pkt, mq_elt);
stellar_mq_dispatch_one_message(mq_elt);
DL_DELETE(priority_mq[mq_elt->header.priority], mq_elt);
DL_APPEND(*dealth_letter_queue, mq_elt); // move to dlq list
@@ -645,10 +534,9 @@ static void stellar_mq_free(struct stellar_message **head, UT_array *mq_schema_a
*******************************/
//return 0 if success, otherwise return -1.
int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id)
int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id)
{
if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin
int plugin_idx=plugin_id-PACKET_PULGIN_ID_BASE;
int plugin_idx=plugin_id;
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1;
@@ -661,143 +549,40 @@ int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_
utarray_new(packet_plugin_schema->registed_packet_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
}
return stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_idx, packet_plugin_schema->registed_packet_mq_subscriber_info);
return __stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_idx, packet_plugin_schema->registed_packet_mq_subscriber_info);
}
int packet_mq_publish_message_with_priority(struct packet *pkt, int topic_id, void *data, enum stellar_mq_priority priority)
int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, void *data, enum stellar_mq_priority priority)
{
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
if(st==NULL)return -1;
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)stellar_get_plugin_manager(st);
if(plug_mgr==NULL || plug_mgr->stellar_mq_schema_array == NULL)return -1;
int tid = stellar_get_current_thread_index();
if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt == -1)return -1;
if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return -1;
if(stellar_mq_publish_message(ON_PACKET_TOPIC ,topic_id, data, plug_mgr->stellar_mq_schema_array, plug_mgr->per_thread_data[tid].priority_mq,priority)==0)
{
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=1;
return 0;
}
return -1;
unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
struct stellar_message *msg= CALLOC(struct stellar_message,1);
msg->st=plug_mgr->st;
msg->header.topic_id = topic_id;
msg->header.priority = priority;
msg->body = data;
DL_APPEND(plug_mgr->per_thread_data[tid].priority_mq[priority], msg);
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=1;
return 0;
}
int packet_mq_publish_message(struct packet *pkt, int topic_id, void *data)
int stellar_mq_publish_message(struct stellar *st, int topic_id, void *data)
{
return packet_mq_publish_message_with_priority(pkt, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL);
}
/*******************************
* SESSION MQ *
*******************************/
int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *data, enum stellar_mq_priority priority)
{
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
assert(plug_mgr_rt);
if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
if(plug_mgr_rt->pub_session_msg_cnt == -1)return -1;
if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1;
int tid = stellar_get_current_thread_index();
if(stellar_mq_publish_message(ON_SESSION_TOPIC ,topic_id, data, plug_mgr_rt->plug_mgr->stellar_mq_schema_array,plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq,priority)==0)
{
plug_mgr_rt->pub_session_msg_cnt+=1;
return 0;
}
return -1;
}
int session_mq_publish_message(struct session *sess, int topic_id, void *data)
{
return session_mq_publish_message_with_priority(sess, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL);
}
static void session_mq_update_topic_status(struct plugin_manager_runtime *plug_mgr_rt, struct stellar_mq_topic_schema *topic)
{
//update topic status
switch (bitmap_is_all_zero(plug_mgr_rt->session_mq_status, 0, topic->topic_id, topic->subscriber_cnt))
{
case 1:
bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 0);
break;
case 0:
bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 1);
break;
default:
break;
}
return;
}
static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value)
{
if(bit_value!=0 && bit_value!=1)return -1;
if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin
if(topic_id < 0 || plugin_id < 0)return -1;
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL)return -1;
if(topic_id >= plug_mgr_rt->plug_mgr->stellar_mq_topic_num)return -1;// topic_id out of range
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
if(topic==NULL)return -1;
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plugin_id);
if(session_plugin_schema==NULL)return -1;
unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info);
if(plug_mgr_rt->session_mq_status)
{
for(unsigned int i=0; i < plugin_subscriber_num; i++)
{
struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
if(topic_id==session_plugin_sub_info->topic_id)
{
bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx, topic_id, bit_value);
}
}
session_mq_update_topic_status(plug_mgr_rt, topic);
return 0;
}
return -1;
}
int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id)
{
return session_mq_set_message_status(sess, topic_id, plugin_id, 0);
}
int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id)
{
return session_mq_set_message_status(sess, topic_id, plugin_id, 1);
}
int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id)
{
if(plugin_id >= PACKET_PULGIN_ID_BASE || plugin_on_msg_cb == NULL)return -1;// ignore packet plugin
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr == NULL || plug_mgr->registered_session_plugin_array == NULL)return -1;
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr->registered_session_plugin_array, (unsigned)plugin_id);
if(session_plugin_schema==NULL)return -1;
if(session_plugin_schema->registed_session_mq_subscriber_info==NULL)
{
utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
}
//session plugin id equals to plugin idx
return stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_id, session_plugin_schema->registed_session_mq_subscriber_info);
}
int session_mq_topic_is_active(struct session *sess, int topic_id)
{
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
assert(plug_mgr_rt);
if(plug_mgr_rt->session_topic_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
if(topic_id >= plug_mgr_rt->plug_mgr->stellar_mq_topic_num)return -1;// topic_id out of range
if(bitmap_get(plug_mgr_rt->session_topic_status, 0, topic_id) == 0)return 0;
return 1;
return stellar_mq_publish_message_with_priority(st, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL);
}
/*******************************
* PLUGIN MANAGER SESSION RUNTIME *
*******************************/
static struct stellar_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr)
struct stellar_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr)
{
struct stellar_exdata *exdata_rt = NULL;
if(plug_mgr->stellar_exdata_schema_array==NULL)return NULL;
@@ -805,11 +590,12 @@ static struct stellar_exdata *session_exdata_runtime_new(struct plugin_manager_s
if(len > 0)
{
exdata_rt=CALLOC(struct stellar_exdata, len);
exdata_rt->plug_mgr=plug_mgr;
}
return exdata_rt;
}
static void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr, struct stellar_exdata *exdata_rt)
void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr, struct stellar_exdata *exdata_rt)
{
if(exdata_rt==NULL)return;
if(plug_mgr->stellar_exdata_schema_array==NULL)return;
@@ -827,55 +613,7 @@ static void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr,
}
}
}
}
struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess)
{
struct plugin_manager_runtime *rt = CALLOC(struct plugin_manager_runtime, 1);
rt->plug_mgr = plug_mgr;
rt->sess = sess;
rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->stellar_mq_topic_num, 1);
rt->session_topic_status=bitmap_new(1, plug_mgr->stellar_mq_topic_num, 1);
rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr);
if(plug_mgr->registered_session_plugin_array)
rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array));
return rt;
}
void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
{
if(rt==NULL)return;
if(rt->session_mq_status != NULL)
{
bitmap_free(rt->session_mq_status);
rt->session_mq_status=NULL;
}
if(rt->session_topic_status != NULL)
{
bitmap_free(rt->session_topic_status);
rt->session_topic_status=NULL;
}
if (rt->plug_mgr->registered_session_plugin_array)
{
unsigned int len = utarray_len(rt->plug_mgr->registered_session_plugin_array);
for (unsigned int i = 0; i < len; i++)
{
struct session_plugin_ctx_runtime *plugin_ctx_rt = (rt->plugin_ctx_array + i);
struct registered_session_plugin_schema *session_plugin_schema =
(struct registered_session_plugin_schema *)utarray_eltptr(rt->plug_mgr->registered_session_plugin_array, i);
if (session_plugin_schema->on_ctx_free && plugin_ctx_rt->state == ACTIVE)
{
session_plugin_schema->on_ctx_free(rt->sess, plugin_ctx_rt->plugin_ctx,
session_plugin_schema->plugin_env);
}
}
FREE(rt->plugin_ctx_array);
}
session_exdata_runtime_free(rt->plug_mgr, rt->sess_exdata_array);
FREE(rt->sess_exdata_array);
FREE(rt);
FREE(exdata_rt);
}
@@ -884,7 +622,7 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
*********************************************/
UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL};
int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env)
int stellar_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->registered_packet_plugin_array == NULL)
@@ -898,7 +636,7 @@ int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, p
packet_plugin_schema.on_packet[PACKET_STAGE_OUTPUT] = on_packet_output;
packet_plugin_schema.plugin_env = plugin_env;
utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema);
return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index + PACKET_PULGIN_ID_BASE
return (utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index
}
static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt, enum packet_stage in_out)
@@ -921,7 +659,7 @@ static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, str
p->on_packet[in_out](pkt, ip_proto, p->plugin_env);
}
}
stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt);
stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue);
return;
}
@@ -958,7 +696,7 @@ int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func o
polling_plugin_schema.on_polling = on_polling;
polling_plugin_schema.plugin_env = plugin_env;
utarray_push_back(plug_mgr->registered_polling_plugin_array, &polling_plugin_schema);
return (POLLING_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_polling_plugin_array)-1);// return polling plugin_id, equals to polling plugin arrary index + POLLING_PULGIN_ID_BASE
return (utarray_len(plug_mgr->registered_polling_plugin_array)-1);// return polling plugin_id, equals to polling plugin arrary index + POLLING_PULGIN_ID_BASE
}
int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
@@ -978,192 +716,3 @@ int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
}
return polling_state;
}
/*********************************************
* PLUGIN MANAGER SESSION PLUGIN *
*********************************************/
UT_icd registered_session_plugin_schema_icd = {sizeof(struct registered_session_plugin_schema), NULL, NULL, NULL};
int stellar_session_plugin_register_with_hooks(struct stellar *st,
session_ctx_new_func session_ctx_new,
session_ctx_free_func session_ctx_free,
on_session_new_func session_on_new,
on_session_free_func session_on_free,
void *plugin_env)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->registered_session_plugin_array == NULL)
{
utarray_new(plug_mgr->registered_session_plugin_array, &registered_session_plugin_schema_icd);
}
struct registered_session_plugin_schema session_plugin_schema;
memset(&session_plugin_schema, 0, sizeof(struct registered_session_plugin_schema));
session_plugin_schema.on_ctx_new = session_ctx_new;
session_plugin_schema.on_ctx_free = session_ctx_free;
session_plugin_schema.on_session_new = session_on_new;
session_plugin_schema.on_session_free = session_on_free;
session_plugin_schema.plugin_env = plugin_env;
utarray_push_back(plug_mgr->registered_session_plugin_array, &session_plugin_schema);
return (utarray_len(plug_mgr->registered_session_plugin_array)-1);// return session plugin_id, equals to session plugin arrary index
}
int stellar_session_plugin_register(struct stellar *st,
session_ctx_new_func session_ctx_new,
session_ctx_free_func session_ctx_free,
void *plugin_env)
{
return stellar_session_plugin_register_with_hooks(st, session_ctx_new, session_ctx_free, NULL, NULL, plugin_env);
}
void plugin_manager_on_session_input(struct session *sess, struct packet *pkt)
{
if(sess==NULL)return;
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL)return;
plug_mgr_rt->pub_session_msg_cnt=0;
int tid=stellar_get_current_thread_index();
plugin_manager_scratch_session_set(plug_mgr_rt->plug_mgr, tid, sess);
struct tcp_segment *seg;
enum session_type type = session_get_type(sess);
if (packet_is_ctrl(pkt))
{
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->control_packet_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
}
else
{
switch (type)
{
case SESSION_TYPE_TCP:
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_input_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
while ((seg = session_get_tcp_segment(sess)) != NULL)
{
if(session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id, (void *)seg, STELLAR_MQ_PRIORITY_HIGH)!=0)
{
session_free_tcp_segment(sess, seg);
}
}
break;
case SESSION_TYPE_UDP:
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_input_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
break;
default:
assert(0);
break;
}
}
//TODO: check TCP topic active subscirber num, if 0, return disable assembler state, to reduce tcp reassemble overhead
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
plugin_manager_scratch_session_set(plug_mgr_rt->plug_mgr, tid, NULL);
return;
}
void plugin_manager_on_session_output(struct session *sess, struct packet *pkt)
{
if(sess==NULL)return;
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL)return;
if(unlikely(packet_is_ctrl(pkt)))return;
int tid=stellar_get_current_thread_index();
plugin_manager_scratch_session_set(plug_mgr_rt->plug_mgr, tid, sess);
switch (session_get_type(sess))
{
case SESSION_TYPE_TCP:
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_output_topic_id ,pkt, STELLAR_MQ_PRIORITY_HIGH);
break;
case SESSION_TYPE_UDP:
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_output_topic_id ,pkt, STELLAR_MQ_PRIORITY_HIGH);
break;
default:
assert(0);
break;
}
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish
stellar_mq_free(&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array);
plugin_manager_scratch_session_set(plug_mgr_rt->plug_mgr, tid, NULL);
return;
}
void plugin_manager_on_session_new(struct plugin_manager_schema *plug_mgr, struct session *sess)
{
if(sess==NULL)return;
if(plug_mgr->registered_session_plugin_array==NULL)return;
struct plugin_manager_runtime *plug_mgr_rt = plugin_manager_session_runtime_new(plug_mgr, sess);
session_set_user_data(sess, plug_mgr_rt);
struct registered_session_plugin_schema *s = NULL;
struct session_plugin_ctx_runtime *plugin_ctx_rt;
for(unsigned int i = 0; i < utarray_len(plug_mgr_rt->plug_mgr->registered_session_plugin_array); i++)
{
s = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, i);
if (s->on_session_new)
{
plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + i);
plugin_manager_runtime_update_plugin_ctx(sess, s, plugin_ctx_rt);
s->on_session_new(sess, plugin_ctx_rt->plugin_ctx, s->plugin_env);
}
}
return;
}
void plugin_manager_on_session_free(struct session *sess)
{
if(sess==NULL)return;
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL)return;
if(plug_mgr_rt->plug_mgr->registered_session_plugin_array==NULL)return;
plug_mgr_rt->pub_session_msg_cnt=0;// reset pub_msg_cnt
struct registered_session_plugin_schema *session_plugin_schema = NULL;
struct session_plugin_ctx_runtime *plugin_ctx_rt;
for(unsigned int i = 0; i < utarray_len(plug_mgr_rt->plug_mgr->registered_session_plugin_array); i++)
{
session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, i);
plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + i);
if (session_plugin_schema->on_session_free && plugin_ctx_rt->state != EXIT)//dettached session plugin do not call on_session_free
{
plugin_manager_runtime_update_plugin_ctx(sess, session_plugin_schema, plugin_ctx_rt);
session_plugin_schema->on_session_free(sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
}
}
int tid=stellar_get_current_thread_index();
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, NULL);
plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish
stellar_mq_free(&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array);
plugin_manager_session_runtime_free(plug_mgr_rt);
return;
}
void stellar_session_plugin_dettach_current_session(struct session *sess)
{
struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plug_mgr_rt->current_session_plugin_id);
if(session_plugin_schema==NULL)return;
struct stellar_mq_topic_schema *topic=NULL;
unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info);
//Won't Do: maybe no need to clear session_mq_status, check plugin_ctx before message dispatch
//allow plugin register with null ctx_new and ctx_free
if(plug_mgr_rt->session_mq_status)
{
for(unsigned int i=0; i < plugin_subscriber_num; i++)
{
struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx,session_plugin_sub_info->topic_id, 0);
topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)session_plugin_sub_info->topic_id);
session_mq_update_topic_status(plug_mgr_rt, topic);
}
}
//dettach in ctx INIT, do not call on_ctx_free immidiately
if(plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state != INIT && (session_plugin_schema->on_ctx_free))
{
session_plugin_schema->on_ctx_free(sess, plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx, session_plugin_schema->plugin_env);
plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx=NULL;
}
plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state=EXIT;
return;
}

View File

@@ -18,12 +18,9 @@ void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, str
//return polling work state, 0: idle, 1: working
int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
//publish and dispatch session msg(msg, pkt) on session_mq
void plugin_manager_on_session_input(struct session *sess,struct packet *pkt);
void plugin_manager_on_session_output(struct session *sess,struct packet *pkt);
void plugin_manager_on_session_free(struct session *sess);
void plugin_manager_on_session_new(struct plugin_manager_schema *plug_mgr, struct session *sess);
struct stellar_exdata;
struct stellar_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr);
void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr, struct stellar_exdata *exdata_rt);
#ifdef __cplusplus
}

View File

@@ -11,7 +11,6 @@ extern "C"
#include "stellar/stellar_mq.h"
#include "stellar/stellar_exdata.h"
#include "bitmap/bitmap.h"
#include "uthash/utarray.h"
@@ -38,18 +37,10 @@ struct plugin_manager_schema
UT_array *plugin_load_specs_array;
UT_array *stellar_exdata_schema_array;
UT_array *stellar_mq_schema_array;
UT_array *registered_session_plugin_array;
UT_array *registered_packet_plugin_array;
UT_array *registered_polling_plugin_array;
int stellar_mq_topic_num;
int packet_topic_subscriber_num;
int session_topic_subscriber_num;
int tcp_input_topic_id;
int tcp_output_topic_id;
int tcp_stream_topic_id;
int udp_input_topic_id;
int udp_output_topic_id;
int control_packet_topic_id;
int mq_topic_subscriber_num;
int max_message_dispatch;
struct plugin_manager_per_thread_data *per_thread_data;
}__attribute__((aligned(sizeof(void*))));
@@ -59,12 +50,11 @@ enum plugin_exdata_state
struct stellar_exdata
{
struct plugin_manager_schema *plug_mgr;
void *exdata;
enum plugin_exdata_state state;
};
struct stellar_exdata_schema
{
char *name;
@@ -75,18 +65,12 @@ struct stellar_exdata_schema
}__attribute__((aligned(sizeof(void*))));
enum stellar_topic_type
{
ON_SESSION_TOPIC,
ON_PACKET_TOPIC,
};
struct stellar_message
{
struct stellar *st;
struct
{
int topic_id;
enum stellar_topic_type type;
enum stellar_mq_priority priority;
} header;
void *body;
@@ -97,12 +81,7 @@ typedef struct stellar_mq_subscriber
{
int topic_subscriber_idx;
int plugin_idx;
union
{
on_session_msg_cb_func *sess_msg_cb;
on_packet_msg_cb_func *pkt_msg_cb;
void *msg_cb;
};
on_msg_cb_func *plugin_msg_cb;
struct stellar_mq_subscriber *next, *prev;
}stellar_mq_subscriber __attribute__((aligned(sizeof(void*))));
@@ -119,28 +98,6 @@ struct stellar_mq_topic_schema
}__attribute__((aligned(sizeof(void*))));
struct session_plugin_ctx_runtime
{
enum plugin_exdata_state state;
int session_plugin_id;
void *plugin_ctx;
}__attribute__((aligned(sizeof(void*))));
struct plugin_manager_runtime
{
struct plugin_manager_schema *plug_mgr;
struct session *sess;
struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber
struct bitmap *session_topic_status; //N bits, N topic
struct stellar_exdata *sess_exdata_array;
struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free
int current_session_plugin_id;
int pub_session_msg_cnt;
}__attribute__((aligned(sizeof(void*))));
enum packet_stage
{
PACKET_STAGE_INPUT=0,
@@ -168,19 +125,6 @@ struct stellar_mq_subscriber_info
int subscriber_idx;
}__attribute__((aligned(sizeof(void*))));
struct registered_session_plugin_schema
{
session_ctx_new_func *on_ctx_new;
session_ctx_free_func *on_ctx_free;
on_session_new_func *on_session_new;
on_session_free_func *on_session_free;
void *plugin_env;
UT_array *registed_session_mq_subscriber_info;
}__attribute__((aligned(sizeof(void*))));
#define SESSION_PULGIN_ID_BASE 0x00000
#define PACKET_PULGIN_ID_BASE 0x10000
#define POLLING_PULGIN_ID_BASE 0x20000
/*******************************
* PLUGIN MANAGER INIT & EXIT *

File diff suppressed because it is too large Load Diff

View File

@@ -5,7 +5,7 @@ extern "C"
{
#endif
#include "plugin_manager_interna.h"
#include "plugin_manager/plugin_manager_interna.h"
#include "stellar/session.h"
#include "tuple.h"
@@ -37,7 +37,7 @@ struct packet
struct session
{
struct plugin_manager_runtime *plug_mgr_rt;
struct stellar_exdata *session_exdat_rt;
enum session_type type;
enum session_state state;
int sess_pkt_cnt;
@@ -54,7 +54,7 @@ int stellar_set_plugin_manger(struct stellar *st, struct plugin_manager_schema *
return 0;
}
int stellar_get_worker_thread_num(struct stellar *st)
int stellar_get_worker_thread_num(struct stellar *st __attribute__((unused)))
{
return 16;
}
@@ -77,12 +77,12 @@ enum session_type session_get_type(const struct session *sess)
void session_set_user_data(struct session *sess, void *user_data)
{
sess->plug_mgr_rt = (struct plugin_manager_runtime *)user_data;
sess->session_exdat_rt = (struct stellar_exdata *)user_data;
}
void *session_get_user_data(const struct session *sess)
{
return sess->plug_mgr_rt;
return sess->session_exdat_rt;
}
void *packet_get_user_data(const struct packet *pkt)
@@ -96,21 +96,11 @@ int packet_get_innermost_tuple6(const struct packet *pkt, struct tuple6 *tuple)
return 0;
}
uint8_t packet_is_ctrl(const struct packet *pkt)
uint8_t packet_is_ctrl(const struct packet *pkt __attribute__((unused)))
{
return 0;
}
struct tcp_segment *session_get_tcp_segment(struct session *sess)
{
return NULL;
}
void session_free_tcp_segment(struct session *sess, struct tcp_segment *seg)
{
return;
}
#ifdef __cplusplus
}
#endif

View File

@@ -142,7 +142,7 @@ extern "C" void *gtest_lpi_plugin_load(struct stellar *st)
perror("gtest_lpi_plugin_load:l7_protocol_mapper failed !!!\n");
exit(-1);
}
env->test_app_plugin_id=stellar_session_plugin_register_with_hooks(st, NULL, NULL, NULL, gtest_lpi_on_session_free, env);
env->test_app_plugin_id=stellar_plugin_register(st, NULL, NULL, NULL, gtest_lpi_on_session_free, env);
if(env->test_app_plugin_id < 0)
{
perror("gtest_lpi_plugin_load:stellar_plugin_register failed !!!\n");