EX_data中的hash表使用RCU更新,替代读写锁,提高多线程下的访问性能。

This commit is contained in:
zhengchao
2022-01-21 22:31:13 +05:00
parent dd86ba5fc1
commit 732a944ff4
6 changed files with 207 additions and 49 deletions

View File

@@ -1,8 +1,8 @@
cmake_minimum_required(VERSION 3.5)
set(MAAT_FRAME_MAJOR_VERSION 3)
set(MAAT_FRAME_MINOR_VERSION 5)
set(MAAT_FRAME_PATCH_VERSION 2)
set(MAAT_FRAME_MINOR_VERSION 6)
set(MAAT_FRAME_PATCH_VERSION 0)
set(MAAT_FRAME_VERSION ${MAAT_FRAME_MAJOR_VERSION}.${MAAT_FRAME_MINOR_VERSION}.${MAAT_FRAME_PATCH_VERSION})
message(STATUS "Maat Frame, Version: ${MAAT_FRAME_VERSION}")

View File

@@ -18,6 +18,7 @@ struct EX_data_container
void* user_data;
UT_hash_handle hh;
UT_hash_handle hh_a, hh_b;
const struct EX_data_rt* rt;
};
@@ -27,8 +28,11 @@ struct EX_data_rt
UT_array *cache_rows;
size_t cache_row_num;
size_t cache_size;
pthread_rwlock_t rwlock;
struct EX_data_container* hash_key2ex;
pthread_mutex_t mutex_update_commit;
int is_updating;
char effective_hash;//value 'a' or 'b', indicates which hash is effective
struct EX_data_container* hash_key2ex_a, *hash_key2ex_b; //two hash for read-copy-update (RCU)
const struct EX_data_schema* ex_schema;
int table_id;
@@ -62,12 +66,14 @@ UT_icd ut_cache_row_icd = {sizeof(char*), NULL, NULL, cache_row_free};
struct EX_data_rt* EX_data_rt_new(int table_id, Maat_plugin_EX_key2index_func_t * key2index, void (* user_data_free)(void *user_data), struct Maat_garbage_bin* bin)
{
struct EX_data_rt* p=ALLOC(struct EX_data_rt, 1);
p->hash_key2ex=NULL;
p->hash_key2ex_a=NULL;
p->hash_key2ex_b=NULL;
p->effective_hash='a';
utarray_new(p->cache_rows, &ut_cache_row_icd);
p->table_id=table_id;
p->user_data_free=user_data_free;
pthread_rwlock_init(&p->rwlock, NULL);
p->ref_bin=bin;
pthread_mutex_init(&p->mutex_update_commit, NULL);
return p;
};
size_t EX_data_rt_get_cached_row_num(struct EX_data_rt* ex_rt)
@@ -78,25 +84,38 @@ void EX_data_rt_set_schema(struct EX_data_rt* p, const struct EX_data_schema* sc
{
p->ex_schema=schema;
}
void EX_data_rt_free(struct EX_data_rt* p)
void EX_data_rt_free(struct EX_data_rt* ex_rt)
{
struct EX_data_container* ex_container=NULL, *tmp=NULL;
assert(ex_rt->is_updating==0);
if(ex_rt->effective_hash=='a')
{
HASH_ITER(hh_a, ex_rt->hash_key2ex_a, ex_container, tmp)
{
HASH_DELETE(hh_a, ex_rt->hash_key2ex_a, ex_container);
EX_data_container_free(ex_container);
}
}
else
{
HASH_ITER(hh_b, ex_rt->hash_key2ex_a, ex_container, tmp)
{
HASH_DELETE(hh_b, ex_rt->hash_key2ex_a, ex_container);
EX_data_container_free(ex_container);
}
}
//ex_rt->hash_key2ex's memory is freed when its last element was deleted.
HASH_ITER(hh, p->hash_key2ex, ex_container, tmp)
if(ex_rt->cache_rows)
{
HASH_DELETE(hh, p->hash_key2ex, ex_container);
EX_data_container_free(ex_container);
utarray_free(ex_rt->cache_rows);
ex_rt->cache_rows=NULL;
ex_rt->cache_row_num=0;
}
//p->hash_key2ex's memory is freed when its last element was deleted.
if(p->cache_rows)
{
utarray_free(p->cache_rows);
p->cache_rows=NULL;
p->cache_row_num=0;
}
pthread_rwlock_destroy(&p->rwlock);
p->ref_bin=NULL;
free(p);
ex_rt->ref_bin=NULL;
free(ex_rt);
return;
}
void EX_data_rt_cache_row_put(struct EX_data_rt* p, const char* row)
@@ -122,7 +141,118 @@ void EX_data_rt_clear_row_cache(struct EX_data_rt* p)
p->cache_row_num=0;
p->cache_size=0;
}
void EX_data_rt_update_prepare(struct EX_data_rt* ex_rt)
{
struct EX_data_container* ex_container=NULL, *tmp=NULL;
if(ex_rt->effective_hash=='a')
{
assert(ex_rt->hash_key2ex_b==NULL);
HASH_ITER(hh_a, ex_rt->hash_key2ex_a, ex_container, tmp)
{
HASH_ADD_KEYPTR(hh_b, ex_rt->hash_key2ex_b, ex_container->key, ex_container->key_len, ex_container);
}
}
else
{
assert(ex_rt->hash_key2ex_a==NULL);
HASH_ITER(hh_b, ex_rt->hash_key2ex_b, ex_container, tmp)
{
HASH_ADD_KEYPTR(hh_a, ex_rt->hash_key2ex_a, ex_container->key, ex_container->key_len, ex_container);
}
}
ex_rt->is_updating=1;
return;
}
void EX_data_rt_update_commit(struct EX_data_rt* ex_rt)
{
struct EX_data_container* ex_container=NULL, *tmp=NULL;
//lock contention of Maat_plugin_EX_register and rule update thread.
pthread_mutex_lock(&ex_rt->mutex_update_commit);
if(!ex_rt->is_updating)
{
pthread_mutex_unlock(&ex_rt->mutex_update_commit);
return;
}
if(ex_rt->effective_hash=='a')
{
ex_rt->effective_hash='b';
usleep(100); //urgly sleep, wait for EX_data_rt_get_EX_data_by_key to release the effective hash table.
HASH_ITER(hh_a, ex_rt->hash_key2ex_a, ex_container, tmp)
{
HASH_DELETE(hh_a, ex_rt->hash_key2ex_a, ex_container);
//ex_container is no need to free, it's either in new hash table or garbage collection queue.
}
}
else
{
ex_rt->effective_hash='a';
usleep(100);
HASH_ITER(hh_b, ex_rt->hash_key2ex_b, ex_container, tmp)
{
HASH_DELETE(hh_b, ex_rt->hash_key2ex_b, ex_container);
}
}
ex_rt->is_updating=0;
pthread_mutex_unlock(&ex_rt->mutex_update_commit);
return;
}
struct EX_data_container* EX_data_rt_effective_hash_find(struct EX_data_rt* ex_rt, const char* key, size_t key_len)
{
struct EX_data_container* exc=NULL;
if(ex_rt->effective_hash=='a')
{
HASH_FIND(hh_a, ex_rt->hash_key2ex_a, key, key_len, exc);
}
else
{
assert(ex_rt->effective_hash=='b');
HASH_FIND(hh_b, ex_rt->hash_key2ex_b, key, key_len, exc);
}
return exc;
}
struct EX_data_container* EX_data_rt_updating_hash_find(struct EX_data_rt* ex_rt, const char* key, size_t key_len)
{
struct EX_data_container* exc=NULL;
if(ex_rt->effective_hash=='a')
{
HASH_FIND(hh_b, ex_rt->hash_key2ex_b, key, key_len, exc);
}
else
{
assert(ex_rt->effective_hash=='b');
HASH_FIND(hh_a, ex_rt->hash_key2ex_a, key, key_len, exc);
}
return exc;
}
void EX_data_rt_updating_hash_add(struct EX_data_rt* ex_rt, const char* key, size_t key_len, struct EX_data_container* exc)
{
if(ex_rt->effective_hash=='a')
{
HASH_ADD_KEYPTR(hh_b, ex_rt->hash_key2ex_b, key,key_len, exc);
}
else
{
HASH_ADD_KEYPTR(hh_a, ex_rt->hash_key2ex_b, key,key_len, exc);
}
return;
}
void EX_data_rt_updating_hash_del(struct EX_data_rt* ex_rt, struct EX_data_container* exc)
{
if(ex_rt->effective_hash=='a')
{
HASH_DELETE(hh_b, ex_rt->hash_key2ex_b, exc);
}
else
{
HASH_DELETE(hh_a, ex_rt->hash_key2ex_a, exc);
}
return;
}
int EX_data_rt_row2EX_data(struct EX_data_rt* ex_rt,
const char* row, const char* key, size_t key_len,
void* user_data, void* logger)
@@ -132,9 +262,11 @@ int EX_data_rt_row2EX_data(struct EX_data_rt* ex_rt,
const struct EX_data_schema* ex_schema=ex_rt->ex_schema;
struct EX_data_container* ex_container=NULL, *tmp=NULL;
int ret=0;
pthread_rwlock_wrlock(&ex_rt->rwlock);
HASH_FIND(hh, ex_rt->hash_key2ex, key, key_len, tmp);
if(!ex_rt->is_updating)
{
EX_data_rt_update_prepare(ex_rt);
}
tmp=EX_data_rt_updating_hash_find(ex_rt, key, key_len);
if(tmp==NULL)
{
ex_container=ALLOC(struct EX_data_container, 1);
@@ -147,7 +279,7 @@ int EX_data_rt_row2EX_data(struct EX_data_rt* ex_rt,
ex_container->user_data=user_data;
ex_container->key_len=key_len;
HASH_ADD_KEYPTR(hh, ex_rt->hash_key2ex, ex_container->key, ex_container->key_len, ex_container);
EX_data_rt_updating_hash_add(ex_rt, ex_container->key, ex_container->key_len, ex_container);
ret=0;
}
else
@@ -158,8 +290,6 @@ int EX_data_rt_row2EX_data(struct EX_data_rt* ex_rt,
ret=-1;
}
pthread_rwlock_unlock(&ex_rt->rwlock);
return ret;
}
int EX_data_rt_delete_by_row(struct EX_data_rt* ex_rt, const char* row, const char* key, size_t key_len,
@@ -167,11 +297,14 @@ int EX_data_rt_delete_by_row(struct EX_data_rt* ex_rt, const char* row, const ch
{
struct EX_data_container* exc=NULL;
int ret=0;
pthread_rwlock_wrlock(&ex_rt->rwlock);
HASH_FIND(hh, ex_rt->hash_key2ex, key, key_len, exc);
if(!ex_rt->is_updating)
{
EX_data_rt_update_prepare(ex_rt);
}
exc=EX_data_rt_updating_hash_find(ex_rt, key, key_len);
if(exc)
{
HASH_DELETE(hh, ex_rt->hash_key2ex, exc);
EX_data_rt_updating_hash_del(ex_rt, exc);
Maat_garbage_bagging(ex_rt->ref_bin, exc, (void (*)(void*))EX_data_container_free);
ret=0;
}
@@ -182,13 +315,12 @@ int EX_data_rt_delete_by_row(struct EX_data_rt* ex_rt, const char* row, const ch
key_len, key, row);
ret=-1;
}
pthread_rwlock_unlock(&ex_rt->rwlock);
return ret;
}
MAAT_RULE_EX_DATA EX_data_rt_get_EX_data_by_key(struct EX_data_rt* ex_rt, const char* key, size_t key_len)
{
struct EX_data_container* container=NULL;
struct EX_data_container* exc=NULL;
MAAT_RULE_EX_DATA ex_data=NULL;
if(!ex_rt->ex_schema)
@@ -196,14 +328,12 @@ MAAT_RULE_EX_DATA EX_data_rt_get_EX_data_by_key(struct EX_data_rt* ex_rt, const
assert(0);
return NULL;
}
pthread_rwlock_rdlock(&ex_rt->rwlock);
HASH_FIND(hh, ex_rt->hash_key2ex, key, key_len, container);
if(container!=NULL)
exc=EX_data_rt_effective_hash_find(ex_rt, key, key_len);
if(exc!=NULL)
{
ex_rt->ex_schema->dup_func(ex_rt->table_id, &(ex_data), &(container->ex_data),
ex_rt->ex_schema->dup_func(ex_rt->table_id, &(ex_data), &(exc->ex_data),
ex_rt->ex_schema->argl, ex_rt->ex_schema->argp);
}
pthread_rwlock_unlock(&ex_rt->rwlock);
return ex_data;
}
MAAT_RULE_EX_DATA EX_data_rt_get_EX_data_by_container(struct EX_data_rt* ex_rt, struct EX_data_container* container)
@@ -218,16 +348,27 @@ size_t EX_data_rt_list_all_ex_container(struct EX_data_rt* ex_rt, struct EX_data
{
size_t ex_data_cnt=0, i=0;
struct EX_data_container* ex_container=NULL, *tmp=NULL;
pthread_rwlock_rdlock(&ex_rt->rwlock);
ex_data_cnt=HASH_COUNT(ex_rt->hash_key2ex);
*ex_container_array=ALLOC(struct EX_data_container*, ex_data_cnt);
HASH_ITER(hh, ex_rt->hash_key2ex, ex_container, tmp)
assert(ex_rt->is_updating==0);
if(ex_rt->effective_hash=='a')
{
(*ex_container_array)[i]=ex_container;
i++;
ex_data_cnt=HASH_CNT(hh_a, ex_rt->hash_key2ex_a);
*ex_container_array=ALLOC(struct EX_data_container*, ex_data_cnt);
HASH_ITER(hh_a, ex_rt->hash_key2ex_a, ex_container, tmp)
{
(*ex_container_array)[i]=ex_container;
i++;
}
}
else
{
ex_data_cnt=HASH_CNT(hh_b, ex_rt->hash_key2ex_b);
*ex_container_array=ALLOC(struct EX_data_container*, ex_data_cnt);
HASH_ITER(hh_b, ex_rt->hash_key2ex_b, ex_container, tmp)
{
(*ex_container_array)[i]=ex_container;
i++;
}
}
pthread_rwlock_unlock(&ex_rt->rwlock);
return ex_data_cnt;
}
void* EX_data_container_get_user_data(struct EX_data_container* ex_container)
@@ -237,9 +378,14 @@ void* EX_data_container_get_user_data(struct EX_data_container* ex_container)
size_t EX_data_rt_get_ex_container_count(struct EX_data_rt* ex_rt)
{
size_t count=0;
pthread_rwlock_rdlock(&ex_rt->rwlock);
count=HASH_COUNT(ex_rt->hash_key2ex);
pthread_rwlock_unlock(&ex_rt->rwlock);
if(ex_rt->effective_hash=='a')
{
count=HASH_CNT(hh_a, ex_rt->hash_key2ex_a);
}
else
{
count=HASH_CNT(hh_b, ex_rt->hash_key2ex_b);
}
return count;
}

View File

@@ -57,7 +57,7 @@ extern "C"
}
#endif
int MAAT_FRAME_VERSION_3_5_2_20220112=1;
int MAAT_FRAME_VERSION_3_5_0_20220121=1;
int is_valid_table_name(const char* str)
{
@@ -2361,6 +2361,7 @@ void do_scanner_update(struct Maat_scanner* scanner, int scan_thread_num, void*
}
break;
case TABLE_TYPE_PLUGIN:
Maat_table_runtime_plugin_update_commit(table_rt);
break;
case TABLE_TYPE_IP_PLUGIN:
ret=Maat_table_runtime_ip_plugin_rebuild_ip_matcher(table_rt);

View File

@@ -314,6 +314,7 @@ int Maat_table_runtime_plugin_commit_ex_schema(struct Maat_table_runtime* table_
row=EX_data_rt_cached_row_get(table_rt->plugin.ex_data_rt, i);
Maat_table_runtime_plugin_new_row(table_rt, table_schema, row, logger);
}
EX_data_rt_update_commit(table_rt->plugin.ex_data_rt);
return 0;
}
MAAT_PLUGIN_EX_DATA Maat_table_runtime_plugin_get_ex_data(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_schema, const char* key)
@@ -372,6 +373,12 @@ void Maat_table_runtime_plugin_new_row(struct Maat_table_runtime* table_rt, stru
return;
}
void Maat_table_runtime_plugin_update_commit(struct Maat_table_runtime* table_rt)
{
EX_data_rt_update_commit(table_rt->plugin.ex_data_rt);
return;
}
void Maat_table_runtime_digest_add(struct Maat_table_runtime* table_rt, int expr_id, const char* digest, short confidence_degree, void* tag)
{
GIE_digest_t* digest_rule=NULL;
@@ -477,7 +484,7 @@ int Maat_table_runtime_fqdn_plugin_rebuild_fqdn_engine(struct Maat_table_runtime
{
return ret;
}
EX_data_rt_update_commit(fqdn_rt->ex_data_rt);
rule_cnt=EX_data_rt_list_all_ex_container(fqdn_rt->ex_data_rt, &exc_array);
rules=ALLOC(struct FQDN_rule, rule_cnt);
for(i=0; i<rule_cnt; i++)
@@ -620,6 +627,7 @@ int Maat_table_runtime_ip_plugin_rebuild_ip_matcher(struct Maat_table_runtime* t
{
return ret;
}
EX_data_rt_update_commit(ip_plugin->ex_data_rt);
rule_cnt=EX_data_rt_list_all_ex_container(ip_plugin->ex_data_rt, &exc_array);
rules=ALLOC(struct ip_rule, rule_cnt);
for(i=0; i<rule_cnt; i++)

View File

@@ -5,6 +5,8 @@ struct EX_data_rt;
struct EX_data_rt* EX_data_rt_new(int table_id, Maat_plugin_EX_key2index_func_t * key2index, void (* user_data_free)(void *user_data), struct Maat_garbage_bin* bin);
void EX_data_rt_free(struct EX_data_rt* p);
void EX_data_rt_update_commit(struct EX_data_rt* ex_rt);
void EX_data_rt_set_schema(struct EX_data_rt* p, const struct EX_data_schema* schema);
void EX_data_rt_cache_row_put(struct EX_data_rt* p, const char* row);

View File

@@ -87,6 +87,7 @@ void Maat_table_runtime_digest_del(struct Maat_table_runtime* table_rt, int expr
int Maat_table_runtime_digest_batch_udpate(struct Maat_table_runtime* table_rt);
void Maat_table_runtime_plugin_new_row(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_schema, const char* row, void *logger);
void Maat_table_runtime_plugin_update_commit(struct Maat_table_runtime* table_rt);
void Maat_table_runtime_ip_plugin_new_row(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_schema, const char* row, void *logger);