feat(infra/exdata): exdata as independent component

This commit is contained in:
yangwei
2024-09-06 18:36:24 +08:00
parent 3de8bbdabc
commit a24214cbee
12 changed files with 270 additions and 177 deletions

View File

@@ -1,5 +1,5 @@
#include "plugin_manager_interna.h"
#include "stellar/session.h"
#include "stellar/stellar_exdata.h"
#include "stellar/utils.h"
#include "toml/toml.h"
#include "uthash/utlist.h"
@@ -10,7 +10,6 @@
#include "packet_private.h"
#include "session_private.h"
#include <stdbool.h>
void stellar_per_stage_message_counter_incby(struct plugin_manager_schema *plug_mgr, int tid, long long increment)
{
@@ -104,7 +103,7 @@ static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread
for (int i = 0; i < thread_num; i++)
{
p_data=per_thread_data+i;
if(p_data->per_thread_pkt_exdata_array.exdata_array)FREE(p_data->per_thread_pkt_exdata_array.exdata_array);
exdata_handle_free(p_data->exdata_array);
}
FREE(per_thread_data);
return;
@@ -129,6 +128,8 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
plug_mgr->st = st;
stellar_set_plugin_manger(st, plug_mgr);
plug_mgr->exdata_schema=exdata_schema_new();
for(int i = 0; i < spec_num; i++)
{
if (specs[i].load_cb != NULL)
@@ -163,7 +164,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->stellar_mq_schema_array);
}
if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array);
//if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array);
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
if(plug_mgr->registered_packet_plugin_array)
{
@@ -175,6 +176,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
utarray_free(plug_mgr->registered_packet_plugin_array);
}
plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
exdata_schema_free(plug_mgr->exdata_schema);
FREE(plug_mgr);
return;
}
@@ -183,124 +185,47 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
* STELLAR EXDATA *
*******************************/
static void stellar_exdata_met_copy(void *_dst, const void *_src)
{
struct stellar_exdata_schema *dst = (struct stellar_exdata_schema *)_dst, *src = (struct stellar_exdata_schema *)_src;
dst->free_func = src->free_func;
dst->free_arg = src->free_arg;
dst->idx = src->idx;
dst->name = src->name ? strdup(src->name) : NULL;
}
static void stellar_exdata_met_dtor(void *_elt)
{
struct stellar_exdata_schema *elt = (struct stellar_exdata_schema *)_elt;
if (elt->name)
FREE(elt->name);
}
UT_icd stellar_exdata_meta_icd = {sizeof(struct stellar_exdata_schema), NULL, stellar_exdata_met_copy, stellar_exdata_met_dtor};
int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *free_arg)
{
if(st==NULL || name==NULL)return -1;
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->stellar_exdata_schema_array == NULL)
{
utarray_new(plug_mgr->stellar_exdata_schema_array, &stellar_exdata_meta_icd);
}
if(plug_mgr->stellar_exdata_schema_array == NULL)return -1;
unsigned int len = utarray_len(plug_mgr->stellar_exdata_schema_array);
struct stellar_exdata_schema *t_schema;
for(unsigned int i = 0; i < len; i++)
{
t_schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->stellar_exdata_schema_array, i);
if(strcmp(t_schema->name, name) == 0)
{
t_schema->free_func=free_func;
t_schema->free_arg=free_arg;
return t_schema->idx;
}
}
struct stellar_exdata_schema new_schema;
memset(&new_schema, 0, sizeof(struct stellar_exdata_schema));
new_schema.free_func=free_func;
new_schema.name=(char *)name;
new_schema.idx=len;
new_schema.free_arg=free_arg;
utarray_push_back(plug_mgr->stellar_exdata_schema_array, &new_schema);
return new_schema.idx;
}
int stellar_exdata_set(UT_array *exdata_schema, struct stellar_exdata *exdata_array, int idx, void *ex_ptr)
{
if(exdata_schema == NULL|| exdata_array == NULL)return -1;
unsigned int len=utarray_len(exdata_schema);
if(len < (unsigned int)idx)return -1;
if((exdata_array+idx)->state == EXIT)return -1;
(exdata_array+idx)->exdata=ex_ptr;
return 0;
}
void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_array, int idx)
{
if(exdata_schema == NULL|| exdata_array == NULL)return NULL;
unsigned int len = utarray_len(exdata_schema);
if(len < (unsigned int)idx)return NULL;
if((exdata_array+idx)->state == EXIT)return NULL;
return (exdata_array+idx)->exdata;
if(plug_mgr->exdata_schema==NULL)return -1;
return exdata_new_index(plug_mgr->exdata_schema, name, free_func, free_arg);
}
/*******************************
* PACKET EXDATA *
*******************************/
static struct stellar_exdata *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr)
static struct exdata_handle *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr)
{
if(plug_mgr==NULL || plug_mgr->stellar_exdata_schema_array == NULL)return NULL;
if(plug_mgr==NULL || plug_mgr->exdata_schema == NULL)return NULL;
int tid=stellar_get_current_thread_index();
if(plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array == NULL)
if(plug_mgr->per_thread_data[tid].exdata_array == NULL)
{
unsigned int len = utarray_len(plug_mgr->stellar_exdata_schema_array);
plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array = CALLOC(struct stellar_exdata, len);
plug_mgr->per_thread_data[tid].exdata_array = exdata_handle_new(plug_mgr->exdata_schema);
}
return plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array;
return plug_mgr->per_thread_data[tid].exdata_array;
}
static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr)
{
if(plug_mgr==NULL || plug_mgr->stellar_exdata_schema_array == NULL)return;
unsigned int len=utarray_len(plug_mgr->stellar_exdata_schema_array);
struct stellar_exdata *per_thread_pkt_exdata_arrary = per_thread_packet_exdata_arrary_get(plug_mgr);
if(per_thread_pkt_exdata_arrary == NULL)return;
for (unsigned int i = 0; i < len; i++)
{
void *exdata = (per_thread_pkt_exdata_arrary + i)->exdata;
(per_thread_pkt_exdata_arrary + i)->state=EXIT;
struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->stellar_exdata_schema_array, i);
if (exdata)
{
if (schema->free_func)
{
schema->free_func(i, exdata, schema->free_arg);
}
(per_thread_pkt_exdata_arrary + i)->exdata=NULL;
}
(per_thread_pkt_exdata_arrary + i)->state=INIT;
}
if(plug_mgr==NULL || plug_mgr->exdata_schema == NULL)return;
struct exdata_handle *per_thread_exdata_handle = per_thread_packet_exdata_arrary_get(plug_mgr);
return exdata_handle_reset(per_thread_exdata_handle);
}
int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr)
{
if(pkt == NULL)return -1;
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
return stellar_exdata_set(plug_mgr->stellar_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr);
return exdata_set(per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr);
}
void *packet_exdata_get(struct packet *pkt, int idx)
{
if(pkt == NULL)return NULL;
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
return stellar_exdata_get( plug_mgr->stellar_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx);
return exdata_get( per_thread_packet_exdata_arrary_get(plug_mgr), idx);
}
/*******************************
@@ -309,18 +234,16 @@ void *packet_exdata_get(struct packet *pkt, int idx)
int session_exdata_set(struct session *sess, int idx, void *ex_ptr)
{
struct stellar_exdata *sess_exdata_array = (struct stellar_exdata *)session_get_user_data(sess);
if(sess_exdata_array == NULL)return -1;
if(sess_exdata_array->plug_mgr->stellar_exdata_schema_array == NULL)return -1;
return stellar_exdata_set(sess_exdata_array->plug_mgr->stellar_exdata_schema_array, sess_exdata_array, idx, ex_ptr);
struct exdata_handle *sess_exdata = (struct exdata_handle *)session_get_user_data(sess);
if(sess_exdata == NULL)return -1;
return exdata_set(sess_exdata,idx, ex_ptr);
}
void *session_exdata_get(struct session *sess, int idx)
{
struct stellar_exdata *sess_exdata_array = (struct stellar_exdata *)session_get_user_data(sess);
if(sess_exdata_array == NULL)return NULL;
if(sess_exdata_array->plug_mgr->stellar_exdata_schema_array==NULL)return NULL;
return stellar_exdata_get(sess_exdata_array->plug_mgr->stellar_exdata_schema_array, sess_exdata_array, idx);
struct exdata_handle *sess_exdata = (struct exdata_handle *)session_get_user_data(sess);
if(sess_exdata == NULL)return NULL;
return exdata_get(sess_exdata, idx);
}
/*******************************
@@ -588,39 +511,15 @@ int stellar_mq_publish_message(struct stellar *st, int topic_id, void *data)
/*******************************
* PLUGIN MANAGER SESSION RUNTIME *
*******************************/
struct stellar_exdata *session_exdata_runtime_new(struct stellar *st)
struct exdata_handle *session_exdata_runtime_new(struct stellar *st)
{
struct stellar_exdata *exdata_rt = NULL;
struct plugin_manager_schema *plug_mgr=stellar_get_plugin_manager(st);
if(plug_mgr->stellar_exdata_schema_array==NULL)return NULL;
unsigned int len = utarray_len(plug_mgr->stellar_exdata_schema_array);
if(len > 0)
{
exdata_rt=CALLOC(struct stellar_exdata, len);
exdata_rt->plug_mgr=plug_mgr; // TODO: temporarily set plug_mgr in exdata[0]
}
return exdata_rt;
return exdata_handle_new(plug_mgr->exdata_schema);
}
void session_exdata_runtime_free(struct stellar_exdata *exdata_rt)
void session_exdata_runtime_free(struct exdata_handle *exdata_h)
{
if(exdata_rt==NULL || exdata_rt->plug_mgr == NULL)return;
if(exdata_rt->plug_mgr->stellar_exdata_schema_array==NULL)return;
unsigned int len=utarray_len(exdata_rt->plug_mgr->stellar_exdata_schema_array);
for (unsigned int i = 0; i < len; i++)
{
void *exdata = (exdata_rt + i)->exdata;
(exdata_rt + i)->state=EXIT;
struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(exdata_rt->plug_mgr->stellar_exdata_schema_array, i);
if (exdata)
{
if (schema->free_func)
{
schema->free_func(i, exdata, schema->free_arg);
}
}
}
FREE(exdata_rt);
return exdata_handle_free(exdata_h);
}