EX_data系列函数使用uthash替代MESA_htable,实现哈希表的动态伸缩。

This commit is contained in:
zhengchao
2020-06-28 22:10:33 +08:00
parent f112c16d2c
commit 207f99714e
6 changed files with 148 additions and 153 deletions

View File

@@ -992,7 +992,7 @@ int Maat_table_callback_register(Maat_feather_t feather,short table_id,
const char* line=NULL;
struct Maat_table_runtime* table_rt=NULL;
table_rt=Maat_table_runtime_get(_feather->scanner->table_rt_mgr, table_id);
long long line_cnt=Maat_table_runtime_plugin_cached_line_count(table_rt);
long long line_cnt=Maat_table_runtime_plugin_cached_row_count(table_rt);
if(line_cnt>0)
{
@@ -1002,7 +1002,7 @@ int Maat_table_callback_register(Maat_feather_t feather,short table_id,
}
for(i=0; i<line_cnt; i++)
{
line=Maat_table_runtime_plugin_get_cached_line(table_rt, i);
line=Maat_table_runtime_plugin_get_cached_row(table_rt, i);
if(line==NULL)
{
break;

View File

@@ -2,142 +2,164 @@
#include "Maat_table.h"
#include "Maat_utils.h"
#include "uthash/uthash.h"
#include "uthash/utarray.h"
#include <MESA/MESA_handle_logger.h>
#include <assert.h>
void EX_data_container_free(void *data)
struct EX_data_container
{
struct EX_data_container* wrap_data=(struct EX_data_container*)data;
const struct EX_data_schema* ex_schema=wrap_data->rt->ex_schema;
ex_schema->free_func(wrap_data->rt->table_id, &(wrap_data->ex_data), ex_schema->argl, ex_schema->argp);
if(wrap_data->user_data && wrap_data->rt->user_data_free)
void* ex_data;
char* key;
size_t key_len;
void* user_data;
UT_hash_handle hh;
const struct EX_data_rt* rt;
};
struct EX_data_rt
{
UT_array *cache_rows;
size_t cache_row_num;
size_t cache_size;
struct EX_data_container* hash_key2ex;
const struct EX_data_schema* ex_schema;
int table_id;
void (* user_data_free)(void *user_data);
};
void EX_data_container_free(struct EX_data_container* ex_container)
{
const struct EX_data_schema* ex_schema=ex_container->rt->ex_schema;
ex_schema->free_func(ex_container->rt->table_id, &(ex_container->ex_data), ex_schema->argl, ex_schema->argp);
if(ex_container->user_data && ex_container->rt->user_data_free)
{
wrap_data->rt->user_data_free(wrap_data->user_data);
ex_container->rt->user_data_free(ex_container->user_data);
}
wrap_data->user_data=NULL;
wrap_data->rt=NULL;
free(wrap_data);
free(ex_container->key);
ex_container->key=NULL;
ex_container->key_len=0;
ex_container->user_data=NULL;
ex_container->rt=NULL;
free(ex_container);
return;
}
static MESA_htable_handle EX_data_hash_new(long long estimate_size, Maat_plugin_EX_key2index_func_t * key2index)
void cache_row_free(void*p)
{
MESA_htable_handle key2ex_hash=NULL;
unsigned int slot_size=1;
while(estimate_size!=0)
{
estimate_size=estimate_size>>1;
slot_size*=2;
}
if(slot_size==1)
{
slot_size=4096;
}
MESA_htable_create_args_t hargs;
memset(&hargs,0,sizeof(hargs));
hargs.thread_safe=8;
hargs.hash_slot_size = slot_size;
hargs.max_elem_num = 0;
hargs.eliminate_type = HASH_ELIMINATE_ALGO_FIFO;
hargs.expire_time = 0;
hargs.key_comp = NULL;
hargs.key2index = NULL; //Not supported yet.
hargs.recursive = 1;
hargs.data_free = EX_data_container_free;
hargs.data_expire_with_condition = NULL;
key2ex_hash=MESA_htable_create(&hargs, sizeof(hargs));
MESA_htable_print_crtl(key2ex_hash, 0);
return key2ex_hash;
free(*(char**)p);
}
UT_icd ut_cache_row_icd = {sizeof(char*), NULL, NULL, cache_row_free};
struct EX_data_rt* EX_data_rt_new(int table_id, long long estimate_size, Maat_plugin_EX_key2index_func_t * key2index, void (* user_data_free)(void *user_data))
{
struct EX_data_rt* p=ALLOC(struct EX_data_rt, 1);
p->key2ex_hash=EX_data_hash_new(estimate_size, key2index);
p->cache_rows=dynamic_array_create(4, 1024);
p->hash_key2ex=NULL;
utarray_new(p->cache_rows, &ut_cache_row_icd);
p->table_id=table_id;
p->user_data_free=user_data_free;
return p;
};
size_t EX_data_rt_get_cached_row_num(struct EX_data_rt* ex_rt)
{
return ex_rt->cache_row_num;
}
void EX_data_rt_set_schema(struct EX_data_rt* p, const struct EX_data_schema* schema)
{
p->ex_schema=schema;
}
void EX_data_rt_free(struct EX_data_rt* p)
{
struct EX_data_container* ex_container=NULL, *tmp=NULL;
HASH_ITER(hh, p->hash_key2ex, ex_container, tmp)
{
HASH_DELETE(hh, p->hash_key2ex, ex_container);
EX_data_container_free(ex_container);
}
//p->hash_key2ex's memory is freed when its last element was deleted.
if(p->cache_rows)
{
dynamic_array_destroy(p->cache_rows, free);
utarray_free(p->cache_rows);
p->cache_rows=NULL;
p->cache_row_num=0;
}
MESA_htable_destroy(p->key2ex_hash, NULL);
free(p);
return;
}
void EX_data_rt_cache_row(struct EX_data_rt* p, const char* row)
void EX_data_rt_cache_row_put(struct EX_data_rt* p, const char* row)
{
size_t len=strlen(row)+1;
char* row_copy=ALLOC(char, len);
memcpy(row_copy, row, len);
p->cache_size+=len;
dynamic_array_write(p->cache_rows, p->cache_row_num, row_copy);
utarray_push_back(p->cache_rows, &row_copy);
p->cache_row_num++;
return;
}
const char* EX_data_rt_get_cached_row(struct EX_data_rt* p, int i)
const char* EX_data_rt_cached_row_get(struct EX_data_rt* p, size_t index)
{
const char* row=NULL;
row=(const char*)dynamic_array_read(p->cache_rows, i);
return row;
const char** row=NULL;
row=(const char**)utarray_eltptr(p->cache_rows, index);
return *row;
}
void EX_data_rt_clear_row_cache(struct EX_data_rt* p)
{
dynamic_array_destroy(p->cache_rows, free);
utarray_free(p->cache_rows);
p->cache_rows=NULL;
p->cache_row_num=0;
p->cache_size=0;
}
int EX_data_rt_get_row_num(struct EX_data_rt* p)
{
return p->cache_row_num;
}
struct EX_data_container* EX_data_rt_row2EX_data(struct EX_data_rt* ex_rt,
int EX_data_rt_row2EX_data(struct EX_data_rt* ex_rt,
const char* row, const char* key, size_t key_len,
void* user_data, void* logger)
{
MAAT_RULE_EX_DATA ex_data=NULL;
int ret=0;
const struct EX_data_schema* ex_schema=ex_rt->ex_schema;
struct EX_data_container* ex_container=ALLOC(struct EX_data_container, 1);
const struct EX_data_schema* ex_schema=ex_rt->ex_schema;
struct EX_data_container* ex_container=NULL, *tmp=NULL;
HASH_FIND(hh, ex_rt->hash_key2ex, key, key_len, tmp);
if(tmp!=NULL)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_module,
"EX data add error: duplicated key %.*s of %s",
key_len, key, row);
return -1;
}
ex_container=ALLOC(struct EX_data_container, 1);
ex_schema->new_func(ex_rt->table_id, key, row, &ex_data,
ex_schema->argl, ex_schema->argp);
ex_container->ex_data=ex_data;
ex_container->rt=ex_rt;
ex_container->user_data=user_data;
ret=MESA_htable_add(ex_rt->key2ex_hash, (unsigned char*)key, key_len, ex_container);
if(ret<0)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_module,
"EX data add error: duplicated key %.*s of %s",
key_len, key, row);
EX_data_container_free(ex_container);
return NULL;
}
return ex_container;
ex_container->key=ALLOC(char, key_len);
memcpy(ex_container->key, key, key_len);
ex_container->key_len=key_len;
HASH_ADD_KEYPTR(hh, ex_rt->hash_key2ex, ex_container->key, ex_container->key_len, ex_container);
return 0;
}
int EX_data_rt_delete_by_row(struct EX_data_rt* ex_rt, const char* row, const char* key, size_t key_len,
void *logger)
{
int ret=0;
ret=MESA_htable_del(ex_rt->key2ex_hash, (const unsigned char*)key, key_len, NULL);
if(ret<0)
struct EX_data_container* exc=NULL;
HASH_FIND(hh, ex_rt->hash_key2ex, key, key_len, exc);
if(exc==NULL)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_module,
"EX data del error: no such key %.*s of %s",
key_len, key, row);
return -1;
}
}
HASH_DELETE(hh, ex_rt->hash_key2ex, exc);
EX_data_container_free(exc);
return 0;
}
@@ -151,8 +173,7 @@ MAAT_RULE_EX_DATA EX_data_rt_get_EX_data_by_key(struct EX_data_rt* ex_rt, const
assert(0);
return NULL;
}
container=(struct EX_data_container*)MESA_htable_search(ex_rt->key2ex_hash,
(const unsigned char*)key, strlen(key));
HASH_FIND(hh, ex_rt->hash_key2ex, key, key_len, container);
if(container!=NULL)
{
ex_rt->ex_schema->dup_func(ex_rt->table_id, &(ex_data), &(container->ex_data),
@@ -162,33 +183,26 @@ MAAT_RULE_EX_DATA EX_data_rt_get_EX_data_by_key(struct EX_data_rt* ex_rt, const
}
MAAT_RULE_EX_DATA EX_data_rt_get_EX_data_by_container(struct EX_data_rt* ex_rt, struct EX_data_container* container)
{
MAAT_RULE_EX_DATA ex_data=NULL;
ex_rt->ex_schema->dup_func(ex_rt->table_id, &(ex_data), &(container->ex_data),
MAAT_RULE_EX_DATA dupped_ex_data=NULL;
ex_rt->ex_schema->dup_func(ex_rt->table_id, &(dupped_ex_data), &(container->ex_data),
ex_rt->ex_schema->argl, ex_rt->ex_schema->argp);
return ex_data;
return dupped_ex_data;
}
struct key2EX_hash_walker
{
EX_data_container_q* listed;
size_t count;
};
void walk_key2EX_hash(const uchar * key, uint size, void * data, void * user)
size_t EX_data_rt_list_all_ex_container(struct EX_data_rt* ex_rt, struct EX_data_container*** ex_container_array)
{
struct key2EX_hash_walker *walker=(struct key2EX_hash_walker*)user;
struct EX_data_container* ex_container=(struct EX_data_container*)data;
TAILQ_INSERT_TAIL(walker->listed, ex_container, entries);
walker->count++;
return;
}
size_t EX_data_rt_list_all(struct EX_data_rt* ex_rt, EX_data_container_q* listed)
{
size_t ex_data_cnt;
struct key2EX_hash_walker walker={listed, 0};
TAILQ_INIT(listed);
MESA_htable_iterate(ex_rt->key2ex_hash, walk_key2EX_hash, &walker);
ex_data_cnt=(size_t)MESA_htable_get_elem_num(ex_rt->key2ex_hash);
assert(walker.count==ex_data_cnt);
size_t ex_data_cnt=HASH_COUNT(ex_rt->hash_key2ex), i=0;
struct EX_data_container* ex_container=NULL, *tmp=NULL;
*ex_container_array=ALLOC(struct EX_data_container*, ex_data_cnt);
HASH_ITER(hh, ex_rt->hash_key2ex, ex_container, tmp)
{
(*ex_container_array)[i]=ex_container;
i++;
}
return ex_data_cnt;
}
void* EX_data_container_get_user_data(struct EX_data_container* ex_container)
{
return ex_container->user_data;
}

View File

@@ -220,7 +220,7 @@ void maat_stat_output(struct _Maat_feather_t* feather)
switch(p_table->table_type)
{
case TABLE_TYPE_PLUGIN:
plugin_cache_num+=table_rt->plugin.ex_data_rt->cache_row_num;
plugin_cache_num+=Maat_table_runtime_plugin_cached_row_count(table_rt);
plugin_acc_num+=table_rt->plugin.acc_line_num;
break;
case TABLE_TYPE_GROUP:

View File

@@ -247,28 +247,28 @@ struct Maat_table_runtime* Maat_table_runtime_get(struct Maat_table_runtime_mana
assert(table_id<(int)table_rt_mgr->n_table_rt);
return table_rt_mgr->table_rt[table_id];
}
long long Maat_table_runtime_plugin_cached_line_count(struct Maat_table_runtime* table_rt)
size_t Maat_table_runtime_plugin_cached_row_count(struct Maat_table_runtime* table_rt)
{
struct plugin_runtime* plugin_rt=&(table_rt->plugin);
return plugin_rt->ex_data_rt->cache_row_num;
assert(table_rt->table_type==TABLE_TYPE_PLUGIN);
return EX_data_rt_get_cached_row_num(plugin_rt->ex_data_rt);
}
const char* Maat_table_runtime_plugin_get_cached_line(struct Maat_table_runtime* table_rt, long long Nth_line)
const char* Maat_table_runtime_plugin_get_cached_row(struct Maat_table_runtime* table_rt, size_t Nth_row)
{
const char* line=NULL;
struct plugin_runtime* plugin_rt=&(table_rt->plugin);
line=(const char*)dynamic_array_read(plugin_rt->ex_data_rt->cache_rows, Nth_line);
line=EX_data_rt_cached_row_get(plugin_rt->ex_data_rt, Nth_row);
return line;
}
int Maat_table_runtime_plugin_commit_ex_schema(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_schema, void* logger)
{
int i=0;
size_t i=0;
const char* row=NULL;
EX_data_rt_set_schema(table_rt->plugin.ex_data_rt, &table_schema->plugin.ex_schema);
for(i=0; i<EX_data_rt_get_row_num(table_rt->plugin.ex_data_rt); i++)
for(i=0; i<EX_data_rt_get_cached_row_num(table_rt->plugin.ex_data_rt); i++)
{
row=EX_data_rt_get_cached_row(table_rt->plugin.ex_data_rt, i);
row=EX_data_rt_cached_row_get(table_rt->plugin.ex_data_rt, i);
Maat_table_runtime_plugin_new_row(table_rt, table_schema, row, logger);
}
return 0;
@@ -323,7 +323,7 @@ void Maat_table_runtime_plugin_new_row(struct Maat_table_runtime* table_rt, stru
}
if(!plugin_schema->have_exdata && !plugin_schema->cb_plug_cnt)
{
EX_data_rt_cache_row(plugin_rt->ex_data_rt, row);
EX_data_rt_cache_row_put(plugin_rt->ex_data_rt, row);
}
return;
@@ -425,22 +425,19 @@ int Maat_table_runtime_digest_batch_udpate(struct Maat_table_runtime* table_rt)
int Maat_table_runtime_ip_plugin_rebuild_ip_matcher(struct Maat_table_runtime* table_rt)
{
struct ip_matcher* new_ip_matcher=NULL;
struct EX_data_container_q queue;//This is for index, no need to free.
size_t rule_cnt=0;
size_t i=0, mem_use=0;
struct ip_rule *rules=NULL;
struct EX_data_container *p=NULL;
TAILQ_INIT(&queue);
rule_cnt=EX_data_rt_list_all(table_rt->ip_plugin.ex_data_rt, &queue);
struct EX_data_container **exc_array=NULL;
assert(table_rt->table_type==TABLE_TYPE_IP_PLUGIN);
rule_cnt=EX_data_rt_list_all_ex_container(table_rt->ip_plugin.ex_data_rt, &exc_array);
rules=ALLOC(struct ip_rule, rule_cnt);
TAILQ_FOREACH(p, &queue, entries)
for(i=0; i<rule_cnt; i++)
{
rules[i]=*((struct ip_rule *)(p->user_data));
assert(rules[i].user_tag==p||rules[i].user_tag==NULL);
rules[i].user_tag=p;
i++;
rules[i]=*((struct ip_rule *)(EX_data_container_get_user_data(exc_array[i])));
assert(rules[i].user_tag==exc_array[i]||rules[i].user_tag==NULL);
rules[i].user_tag=exc_array[i];
}
assert(i==rule_cnt);
if(rule_cnt>0)
{
new_ip_matcher=ip_matcher_new(rules, rule_cnt, &mem_use);
@@ -448,6 +445,8 @@ int Maat_table_runtime_ip_plugin_rebuild_ip_matcher(struct Maat_table_runtime* t
table_rt->ip_plugin.ip_matcher=new_ip_matcher;
}
free(rules);
free(exc_array);
exc_array=NULL;
return 0;
}
struct ip_matcher* Maat_table_runtime_dettach_old_ip_matcher(struct Maat_table_runtime* table_rt)
@@ -503,21 +502,22 @@ void Maat_table_runtime_ip_plugin_new_row(struct Maat_table_runtime* table_rt, s
}
else
{
EX_data_rt_cache_row(ip_plugin_rt->ex_data_rt, row);
EX_data_rt_cache_row_put(ip_plugin_rt->ex_data_rt, row);
}
return;
}
int Maat_table_runtime_ip_plugin_commit_ex_schema(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_schema, void* logger)
{
int i=0;
size_t i=0;
const char* row=NULL;
EX_data_rt_set_schema(table_rt->ip_plugin.ex_data_rt, &table_schema->ip_plugin.ex_schema);
for(i=0; i<EX_data_rt_get_row_num(table_rt->plugin.ex_data_rt); i++)
struct ip_plugin_runtime* ip_plugin_rt=&(table_rt->ip_plugin);
EX_data_rt_set_schema(ip_plugin_rt->ex_data_rt, &table_schema->ip_plugin.ex_schema);
for(i=0; i<EX_data_rt_get_cached_row_num(ip_plugin_rt->ex_data_rt); i++)
{
row=EX_data_rt_get_cached_row(table_rt->plugin.ex_data_rt, i);
row=EX_data_rt_cached_row_get(ip_plugin_rt->ex_data_rt, i);
Maat_table_runtime_ip_plugin_new_row(table_rt, table_schema, row, logger);
}
EX_data_rt_clear_row_cache(table_rt->plugin.ex_data_rt);
EX_data_rt_clear_row_cache(ip_plugin_rt->ex_data_rt);
Maat_table_runtime_ip_plugin_rebuild_ip_matcher(table_rt);
return 0;

View File

@@ -1,45 +1,26 @@
#include "dynamic_array.h"
#include "Maat_rule.h"
#include <MESA/MESA_htable.h>
#include <sys/queue.h>
struct EX_data_rt;
struct EX_data_rt
{
dynamic_array_t *cache_rows;
long long cache_row_num;
long long cache_size;
MESA_htable_handle key2ex_hash;
const struct EX_data_schema* ex_schema;
int table_id;
void (* user_data_free)(void *user_data);
};
struct EX_data_container
{
MAAT_RULE_EX_DATA ex_data;
const struct EX_data_rt* rt;
void* user_data;
TAILQ_ENTRY(EX_data_container) entries;
};
TAILQ_HEAD(EX_data_container_q, EX_data_container);
struct EX_data_rt* EX_data_rt_new(int table_id, long long estimate_size, Maat_plugin_EX_key2index_func_t * key2index, void (* user_data_free)(void *user_data));
void EX_data_rt_free(struct EX_data_rt* p);
void EX_data_rt_set_schema(struct EX_data_rt* p, const struct EX_data_schema* schema);
void EX_data_rt_cache_row(struct EX_data_rt* p, const char* row);
void EX_data_rt_cache_row_put(struct EX_data_rt* p, const char* row);
const char* EX_data_rt_get_cached_row(struct EX_data_rt* p, int i);
const char* EX_data_rt_cached_row_get(struct EX_data_rt* p, size_t index);
void EX_data_rt_clear_row_cache(struct EX_data_rt* p);
int EX_data_rt_get_row_num(struct EX_data_rt* p);
size_t EX_data_rt_get_cached_row_num(struct EX_data_rt* p);
struct EX_data_container* EX_data_rt_row2EX_data(struct EX_data_rt* ex_rt,
int EX_data_rt_row2EX_data(struct EX_data_rt* ex_rt,
const char* row, const char* key, size_t key_len,
void* user_data, void* logger);
int EX_data_rt_delete_by_row(struct EX_data_rt* ex_rt, const char* row, const char* key, size_t key_len, void *logger);
MAAT_RULE_EX_DATA EX_data_rt_get_EX_data_by_key(struct EX_data_rt* ex_rt, const char* key, size_t key_len);
MAAT_RULE_EX_DATA EX_data_rt_get_EX_data_by_container(struct EX_data_rt* ex_rt, struct EX_data_container* container);
size_t EX_data_rt_list_all(struct EX_data_rt* ex_rt, EX_data_container_q* listed);
size_t EX_data_rt_list_all_ex_container(struct EX_data_rt* ex_rt, struct EX_data_container*** ex_container_array);
void* EX_data_container_get_user_data(struct EX_data_container* ex_container);

View File

@@ -64,8 +64,8 @@ struct Maat_table_runtime_manager;
struct Maat_table_runtime_manager* Maat_table_runtime_manager_create(struct Maat_table_manager* table_manager, int max_thread_num);
void Maat_table_rt_manager_destroy(struct Maat_table_runtime_manager* table_rt_mgr);
struct Maat_table_runtime* Maat_table_runtime_get(struct Maat_table_runtime_manager* table_rt_mgr, int table_id);
long long Maat_table_runtime_plugin_cached_line_count(struct Maat_table_runtime* table_rt);
const char* Maat_table_runtime_plugin_get_cached_line(struct Maat_table_runtime* table_rt, long long Nth_line);
size_t Maat_table_runtime_plugin_cached_row_count(struct Maat_table_runtime* table_rt);
const char* Maat_table_runtime_plugin_get_cached_row(struct Maat_table_runtime* table_rt, size_t Nth_row);
int Maat_table_runtime_plugin_commit_ex_schema(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_desc, void* logger);
MAAT_PLUGIN_EX_DATA Maat_table_runtime_plugin_get_ex_data(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_desc, const char* key);