删除IP Plugin表项时,未能及时更新ip_matcher,也未能正确同步uthash和ip_matcher的状态,导致ip_matcher返回了已被删除的ex_data。解决方案:

- 在IP Plugin的table runtime中增加垃圾回收队列,延迟删除EX_data,并延后ip_matcher在扫描线程的生效时机。
- 在scanner中增加ip_plugin_update_q_size,在IP Plugin的table runtime中增加changed_flag,以判断ip_matcher是否需要更新
This commit is contained in:
zhengchao
2020-08-19 22:57:37 +08:00
parent 5931b445ff
commit a44e14f82d
7 changed files with 172 additions and 34 deletions

View File

@@ -1901,22 +1901,36 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** line_
, line_rule[i]->table_name); , line_rule[i]->table_name);
ret=-1; ret=-1;
goto error_out; goto error_out;
} }
p_table=Maat_table_get_by_id_raw(_feather->table_mgr, table_id); p_table=Maat_table_get_by_id_raw(_feather->table_mgr, table_id);
if(!p_table) if(!p_table)
{
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command
,"Command set line id %d failed: table %s is not a plugin table."
, line_rule[i]->rule_id
{ {
ret=-1; ret=-1;
goto error_out; goto error_out;
} }
int valid_flag_column=0;
switch(p_table->table_type)
{
case TABLE_TYPE_PLUGIN:
valid_flag_column=p_table->plugin.valid_flag_column;
plugin_desc=&(p_table->plugin);
break;
case TABLE_TYPE_IP_PLUGIN:
valid_flag_column=p_table->ip_plugin.valid_flag_column;
break;
default:
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command
,"Command set line id %d failed: table %s is not a plugin or ip_plugin table."
, line_rule[i]->rule_id
, line_rule[i]->table_name);
ret=-1;
goto error_out;
}
if(op==MAAT_OP_ADD) if(op==MAAT_OP_ADD)
{ {
ret=get_valid_flag_offset(line_rule[i]->table_line ret=get_valid_flag_offset(line_rule[i]->table_line
, p_table->table_type , p_table->table_type
, valid_flag_column); , valid_flag_column);
if(ret<0|| if(ret<0||
(op==MAAT_OP_ADD&&line_rule[i]->table_line[ret]!='1')) (op==MAAT_OP_ADD&&line_rule[i]->table_line[ret]!='1'))
@@ -1935,7 +1949,7 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** line_
if(line_rule[i]->expire_after>0) if(line_rule[i]->expire_after>0)
{ {
absolute_expire_time=server_time+line_rule[i]->expire_after; absolute_expire_time=server_time+line_rule[i]->expire_after;
} }
if(plugin_desc && plugin_desc->n_foreign>0) if(plugin_desc && plugin_desc->n_foreign>0)
{ {
for(j=0;j<plugin_desc->n_foreign;j++) for(j=0;j<plugin_desc->n_foreign;j++)

View File

@@ -67,7 +67,7 @@ void Maat_garbage_collect_routine(struct Maat_garbage_bin* bin)
for(p=TAILQ_FIRST(&bin->garbage_q); p!=NULL; p=tmp) for(p=TAILQ_FIRST(&bin->garbage_q); p!=NULL; p=tmp)
{ {
tmp=TAILQ_NEXT(p, entries); tmp=TAILQ_NEXT(p, entries);
if(now-p->create_time>p->timeout) if(now-p->create_time>p->timeout || p->timeout==0)
{ {
p->garbage_free(p->garbage); p->garbage_free(p->garbage);
TAILQ_REMOVE(&bin->garbage_q, p, entries); TAILQ_REMOVE(&bin->garbage_q, p, entries);

View File

@@ -34,7 +34,7 @@
#include "stream_fuzzy_hash.h" #include "stream_fuzzy_hash.h"
#include "gram_index_engine.h" #include "gram_index_engine.h"
int MAAT_FRAME_VERSION_3_0_20200731=1; int MAAT_FRAME_VERSION_3_0_20200819=1;
int is_valid_table_name(const char* str) int is_valid_table_name(const char* str)
{ {
@@ -2119,7 +2119,6 @@ void update_digest_rule(struct Maat_table_schema* table, const char* table_line,
} }
} }
error_out: error_out:
digest_rule->digest_string=NULL; digest_rule->digest_string=NULL;
free(digest_rule); free(digest_rule);
@@ -2218,6 +2217,7 @@ void update_ip_plugin_table(struct Maat_table_schema* table_schema, const char*
} }
} }
Maat_table_runtime_ip_plugin_new_row(table_rt, table_schema, table_row, logger); Maat_table_runtime_ip_plugin_new_row(table_rt, table_schema, table_row, logger);
scanner->ip_plugin_update_q_size++;
return; return;
} }
@@ -2269,9 +2269,12 @@ void do_scanner_update(struct Maat_scanner* scanner, int scan_thread_num, void*
} }
break; break;
case TABLE_TYPE_IP_PLUGIN: case TABLE_TYPE_IP_PLUGIN:
Maat_table_runtime_ip_plugin_rebuild_ip_matcher(table_rt); Maat_table_runtime_ip_plugin_build_new_ip_matcher(table_rt);
old_ip_matcher=Maat_table_runtime_dettach_old_ip_matcher(table_rt); old_ip_matcher=Maat_table_runtime_apply_new_ip_matcher(table_rt);
Maat_garbage_bagging(scanner->ref_garbage_bin, old_ip_matcher, (void (*)(void*))ip_matcher_free); if(old_ip_matcher)
{
Maat_garbage_bagging(scanner->ref_garbage_bin, old_ip_matcher, (void (*)(void*))ip_matcher_free);
}
break; break;
default: default:
break; break;
@@ -2353,7 +2356,7 @@ void maat_finish_cb(void* u_para)
feather->scanner->cfg_num=scanner_rule_num(feather->scanner); feather->scanner->cfg_num=scanner_rule_num(feather->scanner);
feather->scanner->version=feather->maat_version; feather->scanner->version=feather->maat_version;
expr_wait_q_cnt=MESA_lqueue_get_count(feather->scanner->region_update_q); expr_wait_q_cnt=MESA_lqueue_get_count(feather->scanner->region_update_q);
feather->postpone_q_size=expr_wait_q_cnt+feather->scanner->gie_update_q_size; feather->postpone_q_size=expr_wait_q_cnt+feather->scanner->gie_update_q_size+feather->scanner->ip_plugin_update_q_size;
if(time(NULL)-feather->scanner->last_update_time>=feather->effect_interval_ms/1000) if(time(NULL)-feather->scanner->last_update_time>=feather->effect_interval_ms/1000)
{ {
do_scanner_update(feather->scanner, do_scanner_update(feather->scanner,

View File

@@ -144,6 +144,7 @@ static struct Maat_table_runtime* table_runtime_new(const struct Maat_table_sche
{ {
EX_data_rt_set_schema(table_rt->ip_plugin.ex_data_rt, &table_schema->ip_plugin.ex_schema); EX_data_rt_set_schema(table_rt->ip_plugin.ex_data_rt, &table_schema->ip_plugin.ex_schema);
} }
table_rt->ip_plugin.bin=Maat_garbage_bin_new(0);
break; break;
default: default:
break; break;
@@ -191,8 +192,9 @@ static void table_runtime_free(struct Maat_table_runtime* p)
case TABLE_TYPE_IP_PLUGIN: case TABLE_TYPE_IP_PLUGIN:
ip_matcher_free(p->ip_plugin.ip_matcher); ip_matcher_free(p->ip_plugin.ip_matcher);
EX_data_rt_free(p->ip_plugin.ex_data_rt); Maat_garbage_bin_free(p->ip_plugin.bin);
assert(p->ip_plugin.old_ip_matcher==NULL); EX_data_rt_free(p->ip_plugin.ex_data_rt);
assert(p->ip_plugin.new_ip_matcher==NULL);
break; break;
case TABLE_TYPE_PLUGIN: case TABLE_TYPE_PLUGIN:
EX_data_rt_free(p->plugin.ex_data_rt); EX_data_rt_free(p->plugin.ex_data_rt);
@@ -422,15 +424,22 @@ int Maat_table_runtime_digest_batch_udpate(struct Maat_table_runtime* table_rt)
return q_cnt; return q_cnt;
} }
int Maat_table_runtime_ip_plugin_rebuild_ip_matcher(struct Maat_table_runtime* table_rt) int Maat_table_runtime_ip_plugin_build_new_ip_matcher(struct Maat_table_runtime* table_rt)
{ {
struct ip_matcher* new_ip_matcher=NULL; struct ip_matcher* new_ip_matcher=NULL;
size_t rule_cnt=0; size_t rule_cnt=0;
size_t i=0, mem_use=0; size_t i=0, mem_use=0;
struct ip_rule *rules=NULL; struct ip_rule *rules=NULL;
struct EX_data_container **exc_array=NULL; struct EX_data_container **exc_array=NULL;
assert(table_rt->table_type==TABLE_TYPE_IP_PLUGIN); struct ip_plugin_runtime *ip_plugin=&(table_rt->ip_plugin);
rule_cnt=EX_data_rt_list_all_ex_container(table_rt->ip_plugin.ex_data_rt, &exc_array); assert(table_rt->table_type==TABLE_TYPE_IP_PLUGIN);
if(!ip_plugin->changed_flag)
{
assert(0==Maat_garbage_bin_get_size(ip_plugin->bin));
return 0;
}
Maat_garbage_collect_routine(ip_plugin->bin);
rule_cnt=EX_data_rt_list_all_ex_container(ip_plugin->ex_data_rt, &exc_array);
rules=ALLOC(struct ip_rule, rule_cnt); rules=ALLOC(struct ip_rule, rule_cnt);
for(i=0; i<rule_cnt; i++) for(i=0; i<rule_cnt; i++)
{ {
@@ -441,21 +450,41 @@ int Maat_table_runtime_ip_plugin_rebuild_ip_matcher(struct Maat_table_runtime* t
if(rule_cnt>0) if(rule_cnt>0)
{ {
new_ip_matcher=ip_matcher_new(rules, rule_cnt, &mem_use); new_ip_matcher=ip_matcher_new(rules, rule_cnt, &mem_use);
table_rt->ip_plugin.old_ip_matcher=table_rt->ip_plugin.ip_matcher; assert(ip_plugin->new_ip_matcher==NULL);
table_rt->ip_plugin.ip_matcher=new_ip_matcher; ip_plugin->new_ip_matcher=new_ip_matcher;
} }
free(rules); free(rules);
free(exc_array); free(exc_array);
exc_array=NULL; exc_array=NULL;
ip_plugin->changed_flag=0;
return 0; return 0;
} }
struct ip_matcher* Maat_table_runtime_dettach_old_ip_matcher(struct Maat_table_runtime* table_rt) struct ip_matcher* Maat_table_runtime_apply_new_ip_matcher(struct Maat_table_runtime* table_rt)
{ {
struct ip_matcher* old_one=table_rt->ip_plugin.old_ip_matcher; struct ip_matcher* old_one=table_rt->ip_plugin.ip_matcher;
table_rt->ip_plugin.ip_matcher=table_rt->ip_plugin.new_ip_matcher;
assert(table_rt->table_type==TABLE_TYPE_IP_PLUGIN); assert(table_rt->table_type==TABLE_TYPE_IP_PLUGIN);
table_rt->ip_plugin.old_ip_matcher=NULL; table_rt->ip_plugin.new_ip_matcher=NULL;
return old_one; return old_one;
} }
struct ip_plugin_ex_free_wrapper
{
struct EX_data_rt* ex_data_rt;
char* row;
size_t key_offset;
size_t key_len;
void* logger;
};
void ip_plugin_ex_data_wrapper_free(void* ex_data)
{
struct ip_plugin_ex_free_wrapper* wrapper=(struct ip_plugin_ex_free_wrapper*)ex_data;
EX_data_rt_delete_by_row(wrapper->ex_data_rt, wrapper->row, wrapper->row + wrapper->key_offset, wrapper->key_len, wrapper->logger);
free(wrapper->row);
wrapper->key_offset=0;
wrapper->key_len=0;
free(wrapper);
return;
}
void Maat_table_runtime_ip_plugin_new_row(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_schema, const char* row, void *logger) void Maat_table_runtime_ip_plugin_new_row(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_schema, const char* row, void *logger)
{ {
struct ip_plugin_table_schema* ip_plugin_schema=&(table_schema->ip_plugin); struct ip_plugin_table_schema* ip_plugin_schema=&(table_schema->ip_plugin);
@@ -463,8 +492,8 @@ void Maat_table_runtime_ip_plugin_new_row(struct Maat_table_runtime* table_rt, s
size_t is_valid_offset=0, valid_len=0; size_t is_valid_offset=0, valid_len=0;
size_t key_offset=0, key_len=0; size_t key_offset=0, key_len=0;
struct ip_rule* ip_rule=NULL; struct ip_rule* ip_rule=NULL;
int ret=0; int ret=0;
struct ip_plugin_ex_free_wrapper* wrapper_for_free=NULL;
if(ip_plugin_schema->have_exdata) if(ip_plugin_schema->have_exdata)
{ {
ret=Maat_helper_read_column(row, ip_plugin_schema->valid_flag_column, &is_valid_offset, &valid_len); ret=Maat_helper_read_column(row, ip_plugin_schema->valid_flag_column, &is_valid_offset, &valid_len);
@@ -493,17 +522,25 @@ void Maat_table_runtime_ip_plugin_new_row(struct Maat_table_runtime* table_rt, s
} }
if(atoi(row+is_valid_offset)==1)//add if(atoi(row+is_valid_offset)==1)//add
{ {
EX_data_rt_row2EX_data(ip_plugin_rt->ex_data_rt, row, row+key_offset, key_len, ip_rule, logger); EX_data_rt_row2EX_data(ip_plugin_rt->ex_data_rt, row, row+key_offset, key_len, ip_rule, logger);
} }
else else
{ {
EX_data_rt_delete_by_row(ip_plugin_rt->ex_data_rt, row, row+key_offset, key_len, logger); wrapper_for_free=ALLOC(struct ip_plugin_ex_free_wrapper, 1);
wrapper_for_free->row=_maat_strdup(row);
wrapper_for_free->ex_data_rt=ip_plugin_rt->ex_data_rt;
wrapper_for_free->key_len=key_len;
wrapper_for_free->key_offset=key_offset;
wrapper_for_free->logger=logger;
Maat_garbage_bagging(ip_plugin_rt->bin, wrapper_for_free, ip_plugin_ex_data_wrapper_free);
} }
} }
else else
{ {
EX_data_rt_cache_row_put(ip_plugin_rt->ex_data_rt, row); EX_data_rt_cache_row_put(ip_plugin_rt->ex_data_rt, row);
} }
ip_plugin_rt->changed_flag=1;
return; return;
} }
int Maat_table_runtime_ip_plugin_commit_ex_schema(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_schema, void* logger) int Maat_table_runtime_ip_plugin_commit_ex_schema(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_schema, void* logger)
@@ -518,8 +555,8 @@ int Maat_table_runtime_ip_plugin_commit_ex_schema(struct Maat_table_runtime* tab
Maat_table_runtime_ip_plugin_new_row(table_rt, table_schema, row, logger); Maat_table_runtime_ip_plugin_new_row(table_rt, table_schema, row, logger);
} }
EX_data_rt_clear_row_cache(ip_plugin_rt->ex_data_rt); EX_data_rt_clear_row_cache(ip_plugin_rt->ex_data_rt);
Maat_table_runtime_ip_plugin_rebuild_ip_matcher(table_rt); Maat_table_runtime_ip_plugin_build_new_ip_matcher(table_rt);
Maat_table_runtime_apply_new_ip_matcher(table_rt);//returned NULL.
return 0; return 0;
} }

View File

@@ -203,6 +203,7 @@ struct Maat_scanner
mcore_long_t ref_cnt; mcore_long_t ref_cnt;
rule_scanner_t region; rule_scanner_t region;
size_t gie_update_q_size; size_t gie_update_q_size;
size_t ip_plugin_update_q_size;
size_t to_update_group_cnt; size_t to_update_group_cnt;
size_t to_update_compile_cnt; size_t to_update_compile_cnt;

View File

@@ -1,5 +1,7 @@
#include "Maat_table.h" #include "Maat_table.h"
#include "Maat_ex_data.h" #include "Maat_ex_data.h"
#include "Maat_garbage_collection.h"
#include "IPMatcher.h" #include "IPMatcher.h"
#include "gram_index_engine.h" #include "gram_index_engine.h"
#include "alignment_int64.h" #include "alignment_int64.h"
@@ -23,7 +25,9 @@ struct ip_plugin_runtime
{ {
struct EX_data_rt* ex_data_rt; struct EX_data_rt* ex_data_rt;
struct ip_matcher* ip_matcher; struct ip_matcher* ip_matcher;
struct ip_matcher* old_ip_matcher; struct ip_matcher* new_ip_matcher;
struct Maat_garbage_bin* bin;
int changed_flag;
}; };
struct expr_runtime struct expr_runtime
{ {
@@ -64,6 +68,7 @@ struct Maat_table_runtime_manager;
struct Maat_table_runtime_manager* Maat_table_runtime_manager_create(struct Maat_table_manager* table_manager, int max_thread_num); struct Maat_table_runtime_manager* Maat_table_runtime_manager_create(struct Maat_table_manager* table_manager, int max_thread_num);
void Maat_table_rt_manager_destroy(struct Maat_table_runtime_manager* table_rt_mgr); void Maat_table_rt_manager_destroy(struct Maat_table_runtime_manager* table_rt_mgr);
struct Maat_table_runtime* Maat_table_runtime_get(struct Maat_table_runtime_manager* table_rt_mgr, int table_id); struct Maat_table_runtime* Maat_table_runtime_get(struct Maat_table_runtime_manager* table_rt_mgr, int table_id);
size_t Maat_table_runtime_plugin_cached_row_count(struct Maat_table_runtime* table_rt); size_t Maat_table_runtime_plugin_cached_row_count(struct Maat_table_runtime* table_rt);
const char* Maat_table_runtime_plugin_get_cached_row(struct Maat_table_runtime* table_rt, size_t Nth_row); const char* Maat_table_runtime_plugin_get_cached_row(struct Maat_table_runtime* table_rt, size_t Nth_row);
@@ -78,7 +83,7 @@ void Maat_table_runtime_ip_plugin_new_row(struct Maat_table_runtime* table_rt, s
int Maat_table_runtime_ip_plugin_commit_ex_schema(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_schema, void* logger); int Maat_table_runtime_ip_plugin_commit_ex_schema(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_schema, void* logger);
int Maat_table_runtime_ip_plugin_get_N_ex_data(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_schema, const struct ip_data* ip, MAAT_PLUGIN_EX_DATA* ex_data_array, size_t size); int Maat_table_runtime_ip_plugin_get_N_ex_data(struct Maat_table_runtime* table_rt, struct Maat_table_schema* table_schema, const struct ip_data* ip, MAAT_PLUGIN_EX_DATA* ex_data_array, size_t size);
int Maat_table_runtime_ip_plugin_rebuild_ip_matcher(struct Maat_table_runtime* table_rt); int Maat_table_runtime_ip_plugin_build_new_ip_matcher(struct Maat_table_runtime* table_rt);
struct ip_matcher* Maat_table_runtime_dettach_old_ip_matcher(struct Maat_table_runtime* table_rt); struct ip_matcher* Maat_table_runtime_apply_new_ip_matcher(struct Maat_table_runtime* table_rt);

View File

@@ -3030,7 +3030,7 @@ void plugin_EX_dup_cb(int table_id, MAAT_PLUGIN_EX_DATA *to, MAAT_PLUGIN_EX_DATA
TEST_F(MaatCmdTest, PluginEXData) TEST_F(MaatCmdTest, PluginEXData)
{ {
#define plugin_EX_data_index #define plugin_EX_data
Maat_feather_t feather=MaatCmdTest::_shared_feather; Maat_feather_t feather=MaatCmdTest::_shared_feather;
int ret=0, i=0; int ret=0, i=0;
@@ -3086,6 +3086,84 @@ TEST_F(MaatCmdTest, PluginEXData)
return; return;
} }
TEST_F(MaatCmdTest, IPPluginEXData)
{
#define ip_plugin_EX_data
Maat_feather_t feather=MaatCmdTest::_shared_feather;
int ret=0, i=0;
int table_id=0, ip_plugin_ex_data_counter=0;
const char* table_name="TEST_IP_PLUGIN_WITH_EXDATA";
const int TEST_CMD_LINE_NUM=4;
const struct Maat_cmd_line *p_line[TEST_CMD_LINE_NUM];
struct Maat_cmd_line line_rule[TEST_CMD_LINE_NUM];
const char* table_line[TEST_CMD_LINE_NUM]={
"101\t4\t192.168.30.99\t192.168.30.101\tSomething-like-json\t1",
"102\t4\t192.168.30.90\t192.168.30.128\tBigger-range-should-in-the-back\t1",
"103\t6\t2001:db8:1234::\t2001:db8:1235::\tBigger-range-should-in-the-back\t1",
"104\t6\t2001:db8:1234::1\t2001:db8:1234::5210\tSomething-like-json\t1"
};
table_id=Maat_table_register(feather, table_name);
ASSERT_GT(table_id, 0);
memset(&line_rule,0,sizeof(line_rule));
for(i=0;i<TEST_CMD_LINE_NUM;i++)
{
line_rule[i].label_id=0;
line_rule[i].rule_id=(int)Maat_cmd_incrby(feather,"TEST_PLUG_SEQ", 1);
line_rule[i].table_name=table_name;
line_rule[i].table_line=table_line[i];
line_rule[i].expire_after=0;
p_line[i]=line_rule+i;
}
ret=Maat_cmd_set_lines(feather, p_line,TEST_CMD_LINE_NUM, MAAT_OP_ADD);
EXPECT_GT(ret, 0);
usleep(WAIT_FOR_EFFECTIVE_US);
ret=Maat_ip_plugin_EX_register(feather, table_id,
ip_plugin_EX_new_cb,
ip_plugin_EX_free_cb,
ip_plugin_EX_dup_cb,
0, &ip_plugin_ex_data_counter);
ASSERT_TRUE(ret>=0);
EXPECT_EQ(ip_plugin_ex_data_counter, 4);
struct ip_address ipv4, ipv6;
struct ip_plugin_ud* result[4];
ipv4.ip_type=4;
inet_pton(AF_INET, "192.168.30.100", &(ipv4.ipv4));
memset(&result, 0, sizeof(result));
ret=Maat_ip_plugin_get_EX_data(feather, table_id, &ipv4, (void**)result, 4);
ASSERT_EQ(ret, 2);
EXPECT_EQ(result[0]->rule_id, 101);
EXPECT_EQ(result[1]->rule_id, 102);
for(i=0; i<ret; i++)
{
ip_plugin_EX_free_cb(0, (void**)&(result[i]), 0, NULL);
}
ipv6.ip_type=6;
inet_pton(AF_INET6,"2001:db8:1234::5210",&(ipv6.ipv6));
memset(&result, 0, sizeof(result));
ret=Maat_ip_plugin_get_EX_data(feather, table_id, &ipv6, (void**)result, 4);
ASSERT_EQ(ret, 2);
EXPECT_EQ(result[0]->rule_id, 104);
EXPECT_EQ(result[1]->rule_id, 103);
for(i=0; i<ret; i++)
{
ip_plugin_EX_free_cb(0, (void**)&(result[i]), 0, NULL);
}
ret=Maat_cmd_set_lines(feather, p_line , TEST_CMD_LINE_NUM, MAAT_OP_DEL);
EXPECT_GT(ret, 0);
usleep(WAIT_FOR_EFFECTIVE_US);
ret=Maat_ip_plugin_get_EX_data(feather, table_id, &ipv4, (void**)result, 4);
ASSERT_EQ(ret, 0);
return;
}
#define TEST_HIT_PATH #define TEST_HIT_PATH
TEST_F(MaatCmdTest, HitPath) TEST_F(MaatCmdTest, HitPath)
{ {