From 571ce08d3ba7bb170702cf6441997d056a270878 Mon Sep 17 00:00:00 2001 From: liuwentan Date: Thu, 13 Apr 2023 14:56:35 +0800 Subject: [PATCH] fix stream scan core https://jira.geedge.net/browse/TSG-14701 --- src/inc_internal/maat_bool_plugin.h | 5 +- src/inc_internal/maat_compile.h | 17 +-- src/inc_internal/maat_expr.h | 12 +- src/inc_internal/maat_flag.h | 12 +- src/inc_internal/maat_fqdn_plugin.h | 5 +- src/inc_internal/maat_group.h | 13 +- src/inc_internal/maat_interval.h | 11 +- src/inc_internal/maat_ip.h | 12 +- src/inc_internal/maat_ip_plugin.h | 5 +- src/inc_internal/maat_plugin.h | 12 +- src/inc_internal/maat_table.h | 5 +- src/maat_api.c | 18 ++- src/maat_bool_plugin.c | 16 ++- src/maat_compile.c | 26 ++-- src/maat_expr.c | 25 +++- src/maat_flag.c | 24 ++-- src/maat_fqdn_plugin.c | 22 ++-- src/maat_group.c | 29 ++++- src/maat_interval.c | 17 +-- src/maat_ip.c | 21 ++-- src/maat_ip_plugin.c | 21 ++-- src/maat_plugin.c | 18 ++- src/maat_rule.c | 16 ++- src/maat_table.c | 36 +++--- test/maat_framework_gtest.cpp | 184 ++++++++++++++-------------- 25 files changed, 341 insertions(+), 241 deletions(-) diff --git a/src/inc_internal/maat_bool_plugin.h b/src/inc_internal/maat_bool_plugin.h index a07092a..aa360a6 100644 --- a/src/inc_internal/maat_bool_plugin.h +++ b/src/inc_internal/maat_bool_plugin.h @@ -38,13 +38,14 @@ struct ex_container_schema *bool_plugin_table_get_ex_container_schema(void *bool /* ip plugin runtime API */ void *bool_plugin_runtime_new(void *bool_plugin_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger); void bool_plugin_runtime_free(void *bool_plugin_runtime); int bool_plugin_runtime_update(void *bool_plugin_runtime, void *bool_plugin_schema, const char *table_name, const char *line, int valid_column); -int bool_plugin_runtime_commit(void *bool_plugin_runtime, const char *table_name); + +int bool_plugin_runtime_commit(void *bool_plugin_runtime, const char *table_name, long long maat_rt_version); long long bool_plugin_runtime_rule_count(void *bool_plugin_runtime); diff --git a/src/inc_internal/maat_compile.h b/src/inc_internal/maat_compile.h index 9098489..19ab918 100644 --- a/src/inc_internal/maat_compile.h +++ b/src/inc_internal/maat_compile.h @@ -45,15 +45,16 @@ int compile_table_set_ex_data_schema(struct compile_schema *compile_schema, int /* compile runtime API */ void *compile_runtime_new(void *compile_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger); void compile_runtime_free(void *compile_runtime); void compile_runtime_init(void *compile_runtime, struct maat_runtime *maat_rt); -int compile_runtime_update(void *compile_runtime, void *compile_schema, - const char *table_name, const char *line, int valid_column); -int compile_runtime_commit(void *compile_runtime, const char *table_name); +int compile_runtime_update(void *compile_runtime, void *compile_schema, const char *table_name, + const char *line, int valid_column); + +int compile_runtime_commit(void *compile_runtime, const char *table_name, long long maat_rt_version); long long compile_runtime_rule_count(void *compile_runtime); @@ -73,14 +74,14 @@ void compile_runtime_ex_data_iterate(struct compile_runtime *compile_rt, /* group2compile runtime API */ void *group2compile_runtime_new(void *g2c_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger); void group2compile_runtime_init(void *g2c_runtime, void *compile_runtime, void *g2g_runtime); void group2compile_runtime_free(void *g2c_runtime); -int group2compile_runtime_update(void *g2c_runtime, void *g2c_schema, - const char *table_name, const char *line, - int valid_column); +int group2compile_runtime_update(void *g2c_runtime, void *g2c_schema, const char *table_name, + const char *line, int valid_column); + long long group2compile_runtime_rule_count(void *g2c_runtime); /* maat compile state API */ diff --git a/src/inc_internal/maat_expr.h b/src/inc_internal/maat_expr.h index 2932dbb..1570c55 100644 --- a/src/inc_internal/maat_expr.h +++ b/src/inc_internal/maat_expr.h @@ -28,17 +28,19 @@ void expr_schema_free(void *expr_schema); /* expr runtime API */ void *expr_runtime_new(void *expr_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger); void expr_runtime_free(void *expr_runtime); -int expr_runtime_update(void *expr_runtime, void *expr_schema, - const char *table_name, const char *line, - int valid_column); -int expr_runtime_commit(void *expr_runtime, const char *table_name); +int expr_runtime_update(void *expr_runtime, void *expr_schema, const char *table_name, + const char *line, int valid_column); + +int expr_runtime_commit(void *expr_runtime, const char *table_name, long long maat_rt_version); long long expr_runtime_rule_count(void *expr_runtime); +long long expr_runtime_get_version(void *expr_runtime); + /* expr runtime scan API */ /** * @brief scan string to get hit group_ids diff --git a/src/inc_internal/maat_flag.h b/src/inc_internal/maat_flag.h index f133fd6..c4a49de 100644 --- a/src/inc_internal/maat_flag.h +++ b/src/inc_internal/maat_flag.h @@ -29,15 +29,15 @@ void *flag_schema_new(cJSON *json, struct table_manager *tbl_mgr, void flag_schema_free(void *flag_schema); /* flag runtime API */ -void *flag_runtime_new(void *flag_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, +void *flag_runtime_new(void *flag_schema, int max_thread_num, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger); void flag_runtime_free(void *flag_runtime); -int flag_runtime_update(void *flag_runtime, void *flag_schema, - const char *table_name, const char *line, - int valid_column); -int flag_runtime_commit(void *flag_runtime, const char *table_name); +int flag_runtime_update(void *flag_runtime, void *flag_schema, const char *table_name, + const char *line, int valid_column); + +int flag_runtime_commit(void *flag_runtime, const char *table_name, long long maat_rt_version); long long flag_runtime_rule_count(void *flag_runtime); diff --git a/src/inc_internal/maat_fqdn_plugin.h b/src/inc_internal/maat_fqdn_plugin.h index 78435c7..e1f881e 100644 --- a/src/inc_internal/maat_fqdn_plugin.h +++ b/src/inc_internal/maat_fqdn_plugin.h @@ -40,13 +40,14 @@ struct ex_container_schema *fqdn_plugin_table_get_ex_container_schema(void *fqdn /* fqdn plugin runtime API */ void *fqdn_plugin_runtime_new(void *fqdn_plugin_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger); void fqdn_plugin_runtime_free(void *fqdn_plugin_runtime); int fqdn_plugin_runtime_update(void *fqdn_plugin_runtime, void *fqdn_plugin_schema, const char *table_name, const char *line, int valid_column); -int fqdn_plugin_runtime_commit(void *fqdn_plugin_runtime, const char *table_name); + +int fqdn_plugin_runtime_commit(void *fqdn_plugin_runtime, const char *table_name, long long maat_rt_version); long long fqdn_plugin_runtime_rule_count(void *fqdn_plugin_runtime); diff --git a/src/inc_internal/maat_group.h b/src/inc_internal/maat_group.h index f4ef644..b693d33 100644 --- a/src/inc_internal/maat_group.h +++ b/src/inc_internal/maat_group.h @@ -30,9 +30,9 @@ void *group2group_schema_new(cJSON *json, struct table_manager *tbl_mgr, void group2group_schema_free(void *g2g_schema); /* group2group runtime API */ -void *group2group_runtime_new(void *g2g_schema, int max_thread_num, +void *group2group_runtime_new(void *g2g_schema, int max_thread_num, struct maat_garbage_bin *garbage_bin, - struct log_handle *logger); + struct log_handle *logger); void group2group_runtime_free(void *g2g_runtime); void maat_group_ref_inc(struct maat_group *group); @@ -42,14 +42,13 @@ struct maat_group *group2group_runtime_add_group(void *g2g_runtime, long long gr void group2group_runtime_remove_group(void *g2g_runtime, struct maat_group *group); struct maat_group *group2group_runtime_find_group(void *g2g_runtime, long long group_id); -int group2group_runtime_build_top_groups(void *g2g_runtime); int group2group_runtime_get_top_groups(void *g2g_runtime, long long *group_ids, size_t n_group_ids, long long *top_group_ids); -int group2group_runtime_update(void *g2g_runtime, void *g2g_schema, - const char *table_name, const char *line, - int valid_column); -int group2group_runtime_commit(void *g2g_runtime, const char *table_name); +int group2group_runtime_update(void *g2g_runtime, void *g2g_schema, const char *table_name, + const char *line, int valid_column); + +int group2group_runtime_commit(void *g2g_runtime, const char *table_name, long long maat_rt_version); long long group2group_runtime_rule_count(void *g2g_runtime); diff --git a/src/inc_internal/maat_interval.h b/src/inc_internal/maat_interval.h index c8ca956..9a514f4 100644 --- a/src/inc_internal/maat_interval.h +++ b/src/inc_internal/maat_interval.h @@ -28,14 +28,15 @@ void *interval_schema_new(cJSON *json, struct table_manager *tbl_mgr, void interval_schema_free(void *interval_schema); /* interval runtime API */ -void *interval_runtime_new(void *interval_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, +void *interval_runtime_new(void *interval_schema, int max_thread_num, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger); void interval_runtime_free(void *interval_runtime); -int interval_runtime_update(void *interval_runtime, void *interval_schema, - const char *table_name,const char *line, int valid_column); -int interval_runtime_commit(void *interval_runtime, const char *table_name); +int interval_runtime_update(void *interval_runtime, void *interval_schema, const char *table_name, + const char *line, int valid_column); + +int interval_runtime_commit(void *interval_runtime, const char *table_name, long long maat_rt_version); long long interval_runtime_rule_count(void *interval_runtime); diff --git a/src/inc_internal/maat_ip.h b/src/inc_internal/maat_ip.h index c3df373..13bc01b 100644 --- a/src/inc_internal/maat_ip.h +++ b/src/inc_internal/maat_ip.h @@ -26,15 +26,15 @@ void *ip_schema_new(cJSON *json, struct table_manager *tbl_mgr, void ip_schema_free(void *ip_schema); /* ip runtime API */ -void *ip_runtime_new(void *ip_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, +void *ip_runtime_new(void *ip_schema, int max_thread_num, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger); void ip_runtime_free(void *ip_runtime); -int ip_runtime_update(void *ip_runtime, void *ip_schema, - const char *table_name, const char *line, - int valid_column); -int ip_runtime_commit(void *ip_runtime, const char *table_name); +int ip_runtime_update(void *ip_runtime, void *ip_schema, const char *table_name, + const char *line, int valid_column); + +int ip_runtime_commit(void *ip_runtime, const char *table_name, long long maat_rt_version); long long ip_runtime_rule_count(void *ip_runtime); diff --git a/src/inc_internal/maat_ip_plugin.h b/src/inc_internal/maat_ip_plugin.h index 0051af8..802944e 100644 --- a/src/inc_internal/maat_ip_plugin.h +++ b/src/inc_internal/maat_ip_plugin.h @@ -39,13 +39,14 @@ struct ex_container_schema *ip_plugin_table_get_ex_container_schema(void *ip_plu /* ip plugin runtime API */ void *ip_plugin_runtime_new(void *ip_plugin_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger); void ip_plugin_runtime_free(void *ip_plugin_runtime); int ip_plugin_runtime_update(void *ip_plugin_runtime, void *ip_plugin_schema, const char *table_name, const char *line, int valid_column); -int ip_plugin_runtime_commit(void *ip_plugin_runtime, const char *table_name); + +int ip_plugin_runtime_commit(void *ip_plugin_runtime, const char *table_name, long long maat_rt_version); long long ip_plugin_runtime_rule_count(void *ip_plugin_runtime); diff --git a/src/inc_internal/maat_plugin.h b/src/inc_internal/maat_plugin.h index e275995..ec22c02 100644 --- a/src/inc_internal/maat_plugin.h +++ b/src/inc_internal/maat_plugin.h @@ -50,15 +50,15 @@ int plugin_table_set_ex_container_schema(void *plugin_schema, int table_id, struct ex_container_schema *plugin_table_get_ex_container_schema(void *plugin_schema); /* plugin runtime API */ -void *plugin_runtime_new(void *plugin_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, +void *plugin_runtime_new(void *plugin_schema, int max_thread_num, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger); void plugin_runtime_free(void *plugin_runtime); -int plugin_runtime_update(void *plugin_runtime, void *plugin_schema, - const char *table_name, const char *line, - int valid_column); -int plugin_runtime_commit(void *plugin_runtime, const char *table_name); +int plugin_runtime_update(void *plugin_runtime, void *plugin_schema, const char *table_name, + const char *line, int valid_column); + +int plugin_runtime_commit(void *plugin_runtime, const char *table_name, long long maat_rt_version); long long plugin_runtime_rule_count(void *plugin_runtime); diff --git a/src/inc_internal/maat_table.h b/src/inc_internal/maat_table.h index 7498b69..bb745dc 100644 --- a/src/inc_internal/maat_table.h +++ b/src/inc_internal/maat_table.h @@ -51,6 +51,7 @@ table_manager_create(const char *table_info_path, const char *accept_tags, struct maat_garbage_bin *garbage_bin, struct log_handle *logger); int table_manager_runtime_create(struct table_manager *tbl_mgr, int max_thread_num, struct maat_garbage_bin *garbage_bin); + void table_manager_runtime_destroy(struct table_manager *tbl_mgr); void table_manager_destroy(struct table_manager *tbl_mgr); @@ -72,7 +73,9 @@ void *table_manager_get_updating_runtime(struct table_manager *tbl_mgr, int tabl int table_manager_update_runtime(struct table_manager *tbl_mgr, const char *table_name, int table_id, const char *line, int update_type); -void table_manager_commit_runtime(struct table_manager *tbl_mgr, int table_id, int update_type); + +void table_manager_commit_runtime(struct table_manager *tbl_mgr, int table_id, + int update_type, long long maat_rt_version); long long table_manager_runtime_rule_count(struct table_manager *tbl_mgr, int table_id); diff --git a/src/maat_api.c b/src/maat_api.c index 560989c..696b1c2 100644 --- a/src/maat_api.c +++ b/src/maat_api.c @@ -46,6 +46,7 @@ struct maat_stream { struct maat *ref_maat_instance; struct adapter_hs_stream *s_handle; //each physical table open one stream long long last_full_version; + long long expr_rt_version; struct log_handle *logger; int thread_id; int vtable_id; @@ -626,16 +627,16 @@ int generic_plugin_runtime_commit_ex_schema(void *runtime, void *schema, const c switch (table_type) { case TABLE_TYPE_PLUGIN: - plugin_runtime_commit(runtime, table_name); + plugin_runtime_commit(runtime, table_name, 0); break; case TABLE_TYPE_IP_PLUGIN: - ip_plugin_runtime_commit(runtime, table_name); + ip_plugin_runtime_commit(runtime, table_name, 0); break; case TABLE_TYPE_FQDN_PLUGIN: - fqdn_plugin_runtime_commit(runtime, table_name); + fqdn_plugin_runtime_commit(runtime, table_name, 0); break; case TABLE_TYPE_BOOL_PLUGIN: - bool_plugin_runtime_commit(runtime, table_name); + bool_plugin_runtime_commit(runtime, table_name, 0); break; default: break; @@ -1472,6 +1473,7 @@ struct maat_stream *maat_stream_new(struct maat *maat_instance, int table_id, void *expr_rt = table_manager_get_runtime(stream->ref_maat_instance->tbl_mgr, stream->physical_table_id); assert(expr_rt != NULL); + stream->expr_rt_version = expr_runtime_get_version(expr_rt); struct adapter_hs_stream *handle = expr_runtime_stream_open((struct expr_runtime *)expr_rt, state->thread_id); @@ -1503,6 +1505,14 @@ int maat_stream_scan(struct maat_stream *maat_stream, const char *data, int data return MAAT_SCAN_OK; } + void *expr_rt = table_manager_get_runtime(maat_instance->tbl_mgr, maat_stream->physical_table_id); + assert(expr_rt != NULL); + + long long cur_expr_rt_version = expr_runtime_get_version(expr_rt); + if (maat_stream->expr_rt_version != cur_expr_rt_version) { + return MAAT_SCAN_OK; + } + alignment_int64_array_add(maat_stream->ref_maat_instance->thread_call_cnt, maat_stream->thread_id, 1); int hit_group_cnt = expr_stream_scan(maat_stream, data, data_len, state); diff --git a/src/maat_bool_plugin.c b/src/maat_bool_plugin.c index 7b88f09..dc0caa0 100644 --- a/src/maat_bool_plugin.c +++ b/src/maat_bool_plugin.c @@ -34,6 +34,7 @@ struct bool_plugin_schema { struct bool_plugin_runtime { struct bool_matcher *matcher; struct ex_data_runtime *ex_data_rt; + long long version; long long rule_num; struct maat_garbage_bin *ref_garbage_bin; struct log_handle *logger; @@ -169,7 +170,7 @@ size_t ull_dedup(unsigned long long item_ids[], size_t n_item) } void *bool_plugin_runtime_new(void *bool_plugin_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger) { if (NULL == bool_plugin_schema) { @@ -405,7 +406,7 @@ void garbage_bool_matcher_free(void *matcher, void *arg) bool_matcher_free(bm); } -int bool_plugin_runtime_commit(void *bool_plugin_runtime, const char *table_name) +int bool_plugin_runtime_commit(void *bool_plugin_runtime, const char *table_name, long long maat_rt_version) { if (NULL == bool_plugin_runtime) { return -1; @@ -436,10 +437,6 @@ int bool_plugin_runtime_commit(void *bool_plugin_runtime, const char *table_name } } - log_info(bool_plugin_rt->logger, MODULE_BOOL_PLUGIN, - "table[%s] committing %zu bool_plugin rules for rebuilding bool_matcher engine", - table_name, rule_cnt); - int ret = 0; size_t mem_used = 0; struct bool_matcher *new_bool_matcher = NULL; @@ -463,6 +460,13 @@ int bool_plugin_runtime_commit(void *bool_plugin_runtime, const char *table_name } bool_plugin_rt->rule_num = rule_cnt; + if (maat_rt_version != 0) { + bool_plugin_rt->version = maat_rt_version; + } + + log_info(bool_plugin_rt->logger, MODULE_BOOL_PLUGIN, + "table[%s] commit %zu bool_plugin rules and rebuild bool_matcher completed, version:%lld", + table_name, rule_cnt, bool_plugin_rt->version); if (rules != NULL) { FREE(rules); diff --git a/src/maat_compile.c b/src/maat_compile.c index f4041fa..33e4a17 100644 --- a/src/maat_compile.c +++ b/src/maat_compile.c @@ -85,6 +85,7 @@ struct compile_runtime { struct group2compile_runtime { long long not_flag_group; + long long version; long long rule_num; struct compile_runtime *ref_compile_rt; struct group2group_runtime *ref_g2g_rt; @@ -493,9 +494,9 @@ void compile_item_free(struct compile_item *compile_item) FREE(compile_item); } -void *compile_runtime_new(void *compile_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, - struct log_handle *logger) +void *compile_runtime_new(void *compile_schema, int max_thread_num, + struct maat_garbage_bin *garbage_bin, + struct log_handle *logger) { if (NULL == compile_schema) { return NULL; @@ -606,11 +607,16 @@ void compile_runtime_init(void *compile_runtime, struct maat_runtime *maat_rt) compile_rt->ref_maat_rt = maat_rt; } -void *group2compile_runtime_new(void *g2c_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, +void *group2compile_runtime_new(void *g2c_schema, int max_thread_num, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger) { + if (NULL == g2c_schema) { + return NULL; + } + struct group2compile_runtime *g2c_rt = ALLOC(struct group2compile_runtime, 1); + return g2c_rt; } @@ -1679,7 +1685,7 @@ long long group2compile_runtime_rule_count(void *g2c_runtime) return g2c_rt->rule_num; } -int compile_runtime_commit(void *compile_runtime, const char *table_name) +int compile_runtime_commit(void *compile_runtime, const char *table_name, long long maat_rt_version) { if (NULL == compile_runtime) { return -1; @@ -1699,10 +1705,6 @@ int compile_runtime_commit(void *compile_runtime, const char *table_name) return 0; } - log_info(compile_rt->logger, MODULE_COMPILE, - "table[%s] committing %zu compile rules for rebuilding compile bool_matcher engine", - table_name, compile_cnt); - int ret = 0; new_bool_matcher = maat_compile_bool_matcher_new(compile_rt); if (NULL == new_bool_matcher) { @@ -1718,6 +1720,10 @@ int compile_runtime_commit(void *compile_runtime, const char *table_name) compile_rt->updating_flag = 0; pthread_rwlock_unlock(&compile_rt->rwlock); + log_info(compile_rt->logger, MODULE_COMPILE, + "table[%s] commit %zu compile rules and rebuild compile bool_matcher completed, version:%lld", + table_name, compile_cnt, maat_rt_version); + maat_garbage_bagging(compile_rt->ref_garbage_bin, old_bool_matcher, NULL, garbage_bool_matcher_free); diff --git a/src/maat_expr.c b/src/maat_expr.c index a9519c7..136738e 100644 --- a/src/maat_expr.c +++ b/src/maat_expr.c @@ -71,6 +71,7 @@ struct expr_runtime { struct rcu_hash_table *htable; // store hs_expr rule for rebuild adapter_hs instance struct rcu_hash_table *item_htable; // store this expr table's all maat_item which will be used in expr_runtime_scan + long long version; //expr_rt version long long rule_num; int n_worker_thread; struct maat_garbage_bin *ref_garbage_bin; @@ -446,7 +447,7 @@ void expr_maat_item_free(void *user_ctx, void *data) } void *expr_runtime_new(void *expr_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger) { if (NULL == expr_schema) { @@ -803,7 +804,7 @@ void garbage_adapter_hs_free(void *adapter_hs, void *arg) adapter_hs_free(hs); } -int expr_runtime_commit(void *expr_runtime, const char *table_name) +int expr_runtime_commit(void *expr_runtime, const char *table_name, long long maat_rt_version) { if (NULL == expr_runtime) { return -1; @@ -837,10 +838,6 @@ int expr_runtime_commit(void *expr_runtime, const char *table_name) } } - log_info(expr_rt->logger, MODULE_EXPR, - "table[%s] committing %zu expr rules for rebuilding adapter_hs engine", - table_name, rule_cnt); - int ret = 0; struct adapter_hs *new_adapter_hs = NULL; struct adapter_hs *old_adapter_hs = NULL; @@ -861,7 +858,13 @@ int expr_runtime_commit(void *expr_runtime, const char *table_name) maat_garbage_bagging(expr_rt->ref_garbage_bin, old_adapter_hs, NULL, garbage_adapter_hs_free); } rcu_hash_commit(expr_rt->item_htable); + expr_rt->rule_num = rule_cnt; + expr_rt->version = maat_rt_version; + + log_info(expr_rt->logger, MODULE_EXPR, + "table[%s] commit %zu expr rules and rebuild adapter_hs completed, version:%lld", + table_name, rule_cnt, expr_rt->version); if (rules != NULL) { FREE(rules); @@ -884,6 +887,16 @@ long long expr_runtime_rule_count(void *expr_runtime) return expr_rt->rule_num; } +long long expr_runtime_get_version(void *expr_runtime) +{ + if (NULL == expr_runtime) { + return -1; + } + + struct expr_runtime *expr_rt = (struct expr_runtime *)expr_runtime; + return expr_rt->version; +} + int expr_runtime_scan(struct expr_runtime *expr_rt, int thread_id, const char *data, size_t data_len, int vtable_id, struct maat_state *state) { diff --git a/src/maat_flag.c b/src/maat_flag.c index 929d4c9..e67b606 100644 --- a/src/maat_flag.c +++ b/src/maat_flag.c @@ -47,6 +47,7 @@ struct flag_runtime { struct rcu_hash_table *htable; //store flag rule for rebuild flag_matcher instance struct rcu_hash_table *item_htable; //store this flag table's all maat_item which will be used in flag_runtime_scan long long rule_num; + long long version; struct maat_garbage_bin *ref_garbage_bin; struct log_handle *logger; int district_num; @@ -163,8 +164,8 @@ void flag_maat_item_free(void *user_ctx, void *data) maat_item_free(item); } -void *flag_runtime_new(void *flag_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, +void *flag_runtime_new(void *flag_schema, int max_thread_num, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger) { if (NULL == flag_schema) { @@ -376,9 +377,8 @@ void flag_rule_free(struct flag_rule *rule) FREE(rule); } -int flag_runtime_update(void *flag_runtime, void *flag_schema, - const char *table_name, const char *line, - int valid_column) +int flag_runtime_update(void *flag_runtime, void *flag_schema, const char *table_name, + const char *line, int valid_column) { if (NULL == flag_runtime || NULL == flag_schema || NULL == line) { @@ -451,7 +451,7 @@ void garbage_flag_matcher_free(void *flag_matcher, void *arg) flag_matcher_free(matcher); } -int flag_runtime_commit(void *flag_runtime, const char *table_name) +int flag_runtime_commit(void *flag_runtime, const char *table_name, long long maat_rt_version) { if (NULL == flag_runtime) { return -1; @@ -465,7 +465,7 @@ int flag_runtime_commit(void *flag_runtime, const char *table_name) } rcu_hash_commit(flag_rt->htable); - + if (flag_rt->tmp_district_map != NULL) { struct maat_kv_store *tmp_map = flag_rt->district_map; flag_rt->district_map = flag_rt->tmp_district_map; @@ -484,10 +484,6 @@ int flag_runtime_commit(void *flag_runtime, const char *table_name) } } - log_info(flag_rt->logger, MODULE_FLAG, - "table[%s] committing %zu flag rules for rebuilding flag_matcher engine", - table_name, rule_cnt); - int ret = 0; struct flag_matcher *new_flag_matcher = NULL; struct flag_matcher *old_flag_matcher = NULL; @@ -509,7 +505,13 @@ int flag_runtime_commit(void *flag_runtime, const char *table_name) garbage_flag_matcher_free); } rcu_hash_commit(flag_rt->item_htable); + flag_rt->rule_num = rule_cnt; + flag_rt->version = maat_rt_version; + + log_info(flag_rt->logger, MODULE_FLAG, + "table[%s] commit %zu flag rules and rebuild flag_matcher completed, version:%lld", + table_name, rule_cnt, flag_rt->version); if (rules != NULL) { FREE(rules); diff --git a/src/maat_fqdn_plugin.c b/src/maat_fqdn_plugin.c index 39fa403..a769584 100644 --- a/src/maat_fqdn_plugin.c +++ b/src/maat_fqdn_plugin.c @@ -35,9 +35,8 @@ struct fqdn_plugin_schema { struct fqdn_plugin_runtime { struct FQDN_engine *engine; struct ex_data_runtime *ex_data_rt; - - uint32_t rule_num; - + long long version; + long long rule_num; struct maat_garbage_bin *ref_garbage_bin; struct log_handle *logger; }; @@ -164,7 +163,7 @@ void fqdn_rule_free(struct FQDN_rule *fqdn_rule) } void *fqdn_plugin_runtime_new(void *fqdn_plugin_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger) { if (NULL == fqdn_plugin_schema) { @@ -403,7 +402,7 @@ void garbage_fqdn_engine_free(void *fqdn_engine, void *arg) FQDN_engine_free(engine); } -int fqdn_plugin_runtime_commit(void *fqdn_plugin_runtime, const char *table_name) +int fqdn_plugin_runtime_commit(void *fqdn_plugin_runtime, const char *table_name, long long maat_rt_version) { if (NULL == fqdn_plugin_runtime) { return -1; @@ -434,10 +433,6 @@ int fqdn_plugin_runtime_commit(void *fqdn_plugin_runtime, const char *table_name } } - log_info(fqdn_plugin_rt->logger, MODULE_FQDN_PLUGIN, - "table[%s] committing %zu fqdn_plugin rules for rebuilding FQDN engine", - table_name, rule_cnt); - int ret = 0; struct FQDN_engine *new_fqdn_engine = NULL; struct FQDN_engine *old_fqdn_engine = NULL; @@ -458,8 +453,15 @@ int fqdn_plugin_runtime_commit(void *fqdn_plugin_runtime, const char *table_name maat_garbage_bagging(fqdn_plugin_rt->ref_garbage_bin, old_fqdn_engine, NULL, garbage_fqdn_engine_free); } - + fqdn_plugin_rt->rule_num = rule_cnt; + if (maat_rt_version != 0) { + fqdn_plugin_rt->version = maat_rt_version; + } + + log_info(fqdn_plugin_rt->logger, MODULE_FQDN_PLUGIN, + "table[%s] commit %zu fqdn_plugin rules and rebuild FQDN engine completed, version:%lld", + table_name, rule_cnt, fqdn_plugin_rt->version); if (rules != NULL) { FREE(rules); diff --git a/src/maat_group.c b/src/maat_group.c index f639285..2719b6d 100644 --- a/src/maat_group.c +++ b/src/maat_group.c @@ -59,6 +59,7 @@ struct maat_group_topology { struct group2group_runtime { struct maat_group_topology *group_topo; + long long version; long long rule_num; pthread_rwlock_t rwlock; @@ -135,10 +136,14 @@ struct maat_group_topology *maat_group_topology_new(struct log_handle *logger) return group_topo; } -void *group2group_runtime_new(void *g2g_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, - struct log_handle *logger) +void *group2group_runtime_new(void *g2g_schema, int max_thread_num, + struct maat_garbage_bin *garbage_bin, + struct log_handle *logger) { + if (NULL == g2g_schema) { + return NULL; + } + struct group2group_runtime *g2g_rt = ALLOC(struct group2group_runtime, 1); g2g_rt->group_topo = maat_group_topology_new(logger); @@ -489,7 +494,7 @@ static size_t effective_vertices_count(igraph_vector_t *vids) return i; } -int group2group_runtime_build_top_groups(void *g2g_runtime) +int group2group_runtime_build_top_groups(void *g2g_runtime, long long maat_rt_version) { if (NULL == g2g_runtime) { return -1; @@ -572,6 +577,8 @@ int group2group_runtime_build_top_groups(void *g2g_runtime) FREE(temp_group_ids); } igraph_vector_destroy(&group_topo->dfs_vids); + g2g_rt->version = maat_rt_version; + pthread_rwlock_unlock(&(g2g_rt->rwlock)); return 0; @@ -604,24 +611,30 @@ int group2group_runtime_update(void *g2g_runtime, void *g2g_schema, //delete ret = group2group_runtime_remove_group_from_group(g2g_runtime, g2g_item->group_id, g2g_item->super_group_id); + if (0 == ret) { + g2g_rt->rule_num--; + } } else { //add ret = group2group_runtime_add_group_to_group(g2g_runtime, g2g_item->group_id, g2g_item->super_group_id); + if (0 == ret) { + g2g_rt->rule_num++; + } } group2group_item_free(g2g_item); return ret; } -int group2group_runtime_commit(void *g2g_runtime, const char *table_name) +int group2group_runtime_commit(void *g2g_runtime, const char *table_name, long long maat_rt_version) { if (NULL == g2g_runtime) { return -1; } struct group2group_runtime *g2g_rt = (struct group2group_runtime *)g2g_runtime; - int ret = group2group_runtime_build_top_groups(g2g_runtime); + int ret = group2group_runtime_build_top_groups(g2g_runtime, maat_rt_version); if (ret < 0) { log_error(g2g_rt->logger, MODULE_GROUP, "[%s:%d] table[%s] group2group runtime commit failed", @@ -629,6 +642,10 @@ int group2group_runtime_commit(void *g2g_runtime, const char *table_name) return -1; } + log_info(g2g_rt->logger, MODULE_GROUP, + "table[%s] commit %zu g2g rules and rebuild top_groups completed, version:%lld", + table_name, g2g_rt->rule_num, g2g_rt->version); + return 0; } diff --git a/src/maat_interval.c b/src/maat_interval.c index 1352309..0c14842 100644 --- a/src/maat_interval.c +++ b/src/maat_interval.c @@ -43,6 +43,7 @@ struct interval_runtime { struct interval_matcher *matcher; struct rcu_hash_table *htable; //store interval rule for rebuild interval_matcher instance struct rcu_hash_table *item_htable; //store this interval table's all maat_item which will be used in interval_runtime_scan + long long version; long long rule_num; struct maat_garbage_bin *ref_garbage_bin; struct log_handle *logger; @@ -160,8 +161,8 @@ void interval_maat_item_free(void *user_ctx, void *data) maat_item_free(item); } -void *interval_runtime_new(void *interval_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, +void *interval_runtime_new(void *interval_schema, int max_thread_num, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger) { if (NULL == interval_schema) { @@ -447,7 +448,7 @@ void garbage_interval_matcher_free(void *interval_matcher, void *arg) interval_matcher_free(matcher); } -int interval_runtime_commit(void *interval_runtime, const char *table_name) +int interval_runtime_commit(void *interval_runtime, const char *table_name, long long maat_rt_version) { if (NULL == interval_runtime) { return -1; @@ -480,10 +481,6 @@ int interval_runtime_commit(void *interval_runtime, const char *table_name) } } - log_info(interval_rt->logger, MODULE_INTERVAL, - "table[%s] committing %zu interval rules for rebuilding interval_matcher engine", - table_name, rule_cnt); - int ret = 0; struct interval_matcher *new_interval_matcher = NULL; struct interval_matcher *old_interval_matcher = NULL; @@ -505,8 +502,14 @@ int interval_runtime_commit(void *interval_runtime, const char *table_name) garbage_interval_matcher_free); } rcu_hash_commit(interval_rt->item_htable); + interval_rt->rule_num = rule_cnt; + interval_rt->version = maat_rt_version; + log_info(interval_rt->logger, MODULE_INTERVAL, + "table[%s] commit %zu interval rules and rebuild interval_matcher completed, version:%lld", + table_name, rule_cnt, interval_rt->version); + if (rules != NULL) { FREE(rules); } diff --git a/src/maat_ip.c b/src/maat_ip.c index 7c35682..b716adf 100644 --- a/src/maat_ip.c +++ b/src/maat_ip.c @@ -69,6 +69,7 @@ struct ip_runtime { struct interval_matcher *intval_matcher; struct rcu_hash_table *htable; //store ip rule for rebuild ip_matcher instance struct rcu_hash_table *item_htable; //store this ip table's all maat_item which will be used in ip_runtime_scan + long long version; long long rule_num; struct maat_garbage_bin *ref_garbage_bin; struct log_handle *logger; @@ -374,10 +375,14 @@ void ip_maat_item_free(void *user_ctx, void *data) maat_item_free(item); } -void *ip_runtime_new(void *ip_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, +void *ip_runtime_new(void *ip_schema, int max_thread_num, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger) { + if (NULL == ip_schema) { + return NULL; + } + struct ip_runtime *ip_rt = ALLOC(struct ip_runtime, 1); ip_rt->htable = rcu_hash_new(ip_ex_data_free, NULL); @@ -540,7 +545,7 @@ void garbage_ip_matcher_free(void *ip_matcher, void *arg) ip_matcher_free(matcher); } -int ip_runtime_commit(void *ip_runtime, const char *table_name) +int ip_runtime_commit(void *ip_runtime, const char *table_name, long long maat_rt_version) { if (NULL == ip_runtime) { return -1; @@ -569,10 +574,6 @@ int ip_runtime_commit(void *ip_runtime, const char *table_name) } } - log_info(ip_rt->logger, MODULE_IP, - "table[%s] committing %zu ip rules for rebuilding ip_matcher engine", - table_name, rule_cnt); - int ret = 0; size_t mem_used = 0; struct ip_matcher *new_ip_matcher = NULL; @@ -613,8 +614,14 @@ int ip_runtime_commit(void *ip_runtime, const char *table_name) } rcu_hash_commit(ip_rt->item_htable); + ip_rt->rule_num = rule_cnt; + ip_rt->version = maat_rt_version; + log_info(ip_rt->logger, MODULE_IP, + "table[%s] commit %zu ip rules and rebuild ip_matcher completed, version:%lld", + table_name, rule_cnt, ip_rt->version); + if (rules != NULL) { FREE(rules); } diff --git a/src/maat_ip_plugin.c b/src/maat_ip_plugin.c index 914ac3b..d40ae27 100644 --- a/src/maat_ip_plugin.c +++ b/src/maat_ip_plugin.c @@ -39,6 +39,7 @@ struct ip_plugin_schema { struct ip_plugin_runtime { struct ip_matcher *ip_matcher; struct ex_data_runtime *ex_data_rt; + long long version; long long rule_num; struct maat_garbage_bin *ref_garbage_bin; struct log_handle *logger; @@ -346,7 +347,7 @@ int ip_plugin_runtime_update_row(struct ip_plugin_runtime *ip_plugin_rt, } void *ip_plugin_runtime_new(void *ip_plugin_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger) { if (NULL == ip_plugin_schema) { @@ -439,7 +440,7 @@ int ip_plugin_runtime_update(void *ip_plugin_runtime, void *ip_plugin_schema, return 0; } -int ip_plugin_runtime_commit(void *ip_plugin_runtime, const char *table_name) +int ip_plugin_runtime_commit(void *ip_plugin_runtime, const char *table_name, long long maat_rt_version) { if (NULL == ip_plugin_runtime) { return -1; @@ -458,23 +459,18 @@ int ip_plugin_runtime_commit(void *ip_plugin_runtime, const char *table_name) ex_data_runtime_commit(ex_data_rt); - size_t i = 0; struct ip_rule *rules = NULL; struct ex_container **ex_container = NULL; size_t rule_cnt = ex_data_runtime_list_ex_container(ex_data_rt, &ex_container); if (rule_cnt > 0) { rules = ALLOC(struct ip_rule, rule_cnt); - for (i = 0; i < rule_cnt; i++) { + for (size_t i = 0; i < rule_cnt; i++) { rules[i] = *(struct ip_rule *)ex_container[i]->custom_data; assert(rules[i].user_tag == ex_container[i] || rules[i].user_tag == NULL); rules[i].user_tag = ex_container[i]; } } - log_info(ip_plugin_rt->logger, MODULE_IP_PLUGIN, - "table[%s] committing %zu ip_plugin rules for rebuilding ip_matcher engine", - table_name, rule_cnt); - int ret = 0; size_t mem_used = 0; struct ip_matcher *new_ip_matcher = NULL; @@ -484,7 +480,7 @@ int ip_plugin_runtime_commit(void *ip_plugin_runtime, const char *table_name) new_ip_matcher = ip_matcher_new(rules, rule_cnt, &mem_used); if (NULL == new_ip_matcher) { log_error(ip_plugin_rt->logger, MODULE_IP_PLUGIN, - "[%s:%d] table[%s] rebuild ip_matcher engine failed when update %zu ip_plugin rules", + "[%s:%d] ip_plugin table[%s] rebuild ip_matcher failed when update %zu rules", __FUNCTION__, __LINE__, table_name, rule_cnt); ret = -1; } @@ -498,6 +494,13 @@ int ip_plugin_runtime_commit(void *ip_plugin_runtime, const char *table_name) } ip_plugin_rt->rule_num = rule_cnt; + if (maat_rt_version != 0) { + ip_plugin_rt->version = maat_rt_version; + } + + log_info(ip_plugin_rt->logger, MODULE_IP_PLUGIN, + "table[%s] commit %zu ip_plugin rules and rebuild ip_matcher completed, version:%lld", + table_name, rule_cnt, ip_plugin_rt->version); if (rules != NULL) { FREE(rules); diff --git a/src/maat_plugin.c b/src/maat_plugin.c index a739fcf..e02d246 100644 --- a/src/maat_plugin.c +++ b/src/maat_plugin.c @@ -31,6 +31,7 @@ struct plugin_callback_schema { struct plugin_runtime { long long acc_line_num; struct ex_data_runtime *ex_data_rt; + long long version; long long rule_num; struct maat_garbage_bin *ref_garbage_bin; struct log_handle *logger; @@ -262,8 +263,8 @@ struct ex_container_schema *plugin_table_get_ex_container_schema(void *plugin_sc return &(schema->container_schema); } -void *plugin_runtime_new(void *plugin_schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, +void *plugin_runtime_new(void *plugin_schema, int max_thread_num, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger) { if (NULL == plugin_schema) { @@ -272,6 +273,7 @@ void *plugin_runtime_new(void *plugin_schema, int max_thread_num, struct plugin_schema *schema = (struct plugin_schema *)plugin_schema; struct plugin_runtime *plugin_rt = ALLOC(struct plugin_runtime, 1); + plugin_rt->ex_data_rt = ex_data_runtime_new(schema->table_id, logger); if (1 == schema->container_schema.set_flag) { ex_data_runtime_set_ex_container_schema(plugin_rt->ex_data_rt, &(schema->container_schema)); @@ -427,7 +429,7 @@ int plugin_runtime_update(void *plugin_runtime, void *plugin_schema, return 0; } -int plugin_runtime_commit(void *plugin_runtime, const char *table_name) +int plugin_runtime_commit(void *plugin_runtime, const char *table_name, long long maat_rt_version) { if (NULL == plugin_runtime) { return -1; @@ -445,8 +447,16 @@ int plugin_runtime_commit(void *plugin_runtime, const char *table_name) } ex_data_runtime_commit(ex_data_rt); - plugin_rt->rule_num = ex_data_runtime_ex_container_count(ex_data_rt); + plugin_rt->rule_num = ex_data_runtime_ex_container_count(ex_data_rt); + if (maat_rt_version != 0) { + plugin_rt->version = maat_rt_version; + } + + log_info(plugin_rt->logger, MODULE_PLUGIN, + "table[%s] commit %zu plugin rules, version:%lld", + table_name, plugin_rt->rule_num, plugin_rt->version); + return 0; } diff --git a/src/maat_rule.c b/src/maat_rule.c index 2d5df56..21df127 100644 --- a/src/maat_rule.c +++ b/src/maat_rule.c @@ -257,10 +257,10 @@ struct maat_runtime* maat_runtime_create(long long version, struct maat *maat_in } void maat_runtime_commit(struct maat_runtime *maat_rt, int update_type, - struct log_handle *logger) + long long maat_rt_version, struct log_handle *logger) { for (size_t i = 0; i < maat_rt->max_table_num; i++) { - table_manager_commit_runtime(maat_rt->ref_tbl_mgr, i, update_type); + table_manager_commit_runtime(maat_rt->ref_tbl_mgr, i, update_type, maat_rt_version); } maat_rt->last_update_time = time(NULL); @@ -429,7 +429,8 @@ void maat_finish_cb(void *u_param) maat_plugin_table_all_callback_finish(maat_instance->tbl_mgr); if (maat_instance->creating_maat_rt != NULL) { - maat_runtime_commit(maat_instance->creating_maat_rt, MAAT_UPDATE_TYPE_FULL, maat_instance->logger); + maat_runtime_commit(maat_instance->creating_maat_rt, MAAT_UPDATE_TYPE_FULL, + maat_instance->creating_maat_rt->version, maat_instance->logger); maat_instance->creating_maat_rt->rule_num = maat_runtime_rule_num(maat_instance->creating_maat_rt); log_info(maat_instance->logger, MODULE_MAAT_RULE, "Full config version %llu load %d entries complete", @@ -437,7 +438,8 @@ void maat_finish_cb(void *u_param) maat_instance->creating_maat_rt->rule_num); } else if (maat_instance->maat_rt != NULL) { maat_instance->maat_rt->version = maat_instance->maat_version; - maat_runtime_commit(maat_instance->maat_rt, MAAT_UPDATE_TYPE_INC, maat_instance->logger); + maat_runtime_commit(maat_instance->maat_rt, MAAT_UPDATE_TYPE_INC, + maat_instance->maat_rt->version, maat_instance->logger); maat_instance->maat_rt->rule_num = maat_runtime_rule_num(maat_instance->maat_rt); log_info(maat_instance->logger, MODULE_MAAT_RULE, "Inc config version %llu load %d entries complete", @@ -445,7 +447,8 @@ void maat_finish_cb(void *u_param) maat_instance->maat_rt->rule_num); } else { log_info(maat_instance->logger, MODULE_MAAT_RULE, - "Version %d has no valid rules, plugin callback complete.", maat_instance->maat_version); + "Version %d has no valid rules, plugin callback complete.", + maat_instance->maat_version); } maat_instance->new_version = INVALID_VERSION; @@ -557,7 +560,8 @@ void *rule_monitor_loop(void *arg) time_t time_window = time(NULL) - maat_instance->maat_rt->last_update_time; if (time_window >= maat_instance->rule_effect_interval_ms / 1000) { - maat_runtime_commit(maat_instance->maat_rt, MAAT_UPDATE_TYPE_INC, maat_instance->logger); + maat_runtime_commit(maat_instance->maat_rt, MAAT_UPDATE_TYPE_INC, + maat_instance->maat_rt->version, maat_instance->logger); log_info(maat_instance->logger, MODULE_MAAT_RULE, "Actual update config version %u, %d entries load to maat runtime.", maat_instance->maat_rt->version, maat_instance->maat_rt->rule_num); diff --git a/src/maat_table.c b/src/maat_table.c index 8d7016e..be2ce07 100644 --- a/src/maat_table.c +++ b/src/maat_table.c @@ -64,16 +64,19 @@ struct table_operations { enum table_type type; void *(*new_schema)(cJSON *json, struct table_manager *tbl_mgr, const char *table_name, struct log_handle *logger); + void (*free_schema)(void *schema); - void *(*new_runtime)(void *schema, int max_thread_num, - struct maat_garbage_bin *garbage_bin, + void *(*new_runtime)(void *schema, int max_thread_num, struct maat_garbage_bin *garbage_bin, struct log_handle *logger); + void (*free_runtime)(void *runtime); int (*update_runtime)(void *runtime, void *schema, const char *table_name, const char *line, int valid_column); - int (*commit_runtime)(void *runtime, const char *table_name); + + int (*commit_runtime)(void *runtime, const char *table_name, + long long maat_rt_version); long long (*runtime_rule_count)(void *runtime); }; @@ -564,14 +567,14 @@ next: return tbl_mgr; } -void *maat_table_runtime_new(void *schema, enum table_type table_type, - int max_thread_num, struct maat_garbage_bin *garbage_bin, - struct log_handle *logger) +void *maat_table_runtime_new(void *schema, enum table_type table_type, int max_thread_num, + struct maat_garbage_bin *garbage_bin, struct log_handle *logger) { void *runtime = NULL; if (table_ops[table_type].new_runtime != NULL) { - runtime = table_ops[table_type].new_runtime(schema, max_thread_num, garbage_bin, logger); + runtime = table_ops[table_type].new_runtime(schema, max_thread_num, + garbage_bin, logger); } return runtime; @@ -851,7 +854,8 @@ int table_manager_update_runtime(struct table_manager *tbl_mgr, const char *tabl line, valid_column); } -void table_commit_updating_runtime(struct table_manager *tbl_mgr, int table_id) +void table_commit_updating_runtime(struct table_manager *tbl_mgr, int table_id, + long long maat_rt_version) { void *updating_rt = table_manager_get_updating_runtime(tbl_mgr, table_id); if (NULL == updating_rt) { @@ -868,7 +872,7 @@ void table_commit_updating_runtime(struct table_manager *tbl_mgr, int table_id) struct maat_table *ptable = tbl_mgr->tbl[table_id]; if ( table_ops[table_type].commit_runtime != NULL) { - table_ops[table_type].commit_runtime(updating_rt, ptable->table_name); + table_ops[table_type].commit_runtime(updating_rt, ptable->table_name, maat_rt_version); } void *runtime = table_manager_get_runtime(tbl_mgr, table_id); @@ -876,13 +880,15 @@ void table_commit_updating_runtime(struct table_manager *tbl_mgr, int table_id) if (runtime != NULL) { enum table_type *arg = ALLOC(enum table_type, 1); *arg = table_type; - maat_garbage_bagging(tbl_mgr->ref_garbage_bin, runtime, arg, garbage_maat_table_runtime_free); + maat_garbage_bagging(tbl_mgr->ref_garbage_bin, runtime, arg, + garbage_maat_table_runtime_free); } tbl_mgr->tbl[table_id]->updating_runtime = NULL; } -void table_commit_runtime(struct table_manager *tbl_mgr, int table_id) +void table_commit_runtime(struct table_manager *tbl_mgr, int table_id, + long long maat_rt_version) { void *runtime = table_manager_get_runtime(tbl_mgr, table_id); if (NULL == runtime) { @@ -899,21 +905,21 @@ void table_commit_runtime(struct table_manager *tbl_mgr, int table_id) struct maat_table *ptable = tbl_mgr->tbl[table_id]; if (table_ops[table_type].commit_runtime != NULL) { - table_ops[table_type].commit_runtime(runtime, ptable->table_name); + table_ops[table_type].commit_runtime(runtime, ptable->table_name, maat_rt_version); } } void table_manager_commit_runtime(struct table_manager *tbl_mgr, int table_id, - int update_type) + int update_type, long long maat_rt_version) { if (NULL == tbl_mgr || table_id < 0) { return; } if (update_type == MAAT_UPDATE_TYPE_FULL) { - table_commit_updating_runtime(tbl_mgr, table_id); + table_commit_updating_runtime(tbl_mgr, table_id, maat_rt_version); } else { - table_commit_runtime(tbl_mgr, table_id); + table_commit_runtime(tbl_mgr, table_id, maat_rt_version); } } diff --git a/test/maat_framework_gtest.cpp b/test/maat_framework_gtest.cpp index fd70607..3755f6b 100644 --- a/test/maat_framework_gtest.cpp +++ b/test/maat_framework_gtest.cpp @@ -337,96 +337,6 @@ static void random_keyword_generate(char *keyword_buf, size_t sz) return; } -#if 0 -class MaatStreamScan : public testing::Test -{ -protected: - static void SetUpTestCase() { - char redis_ip[64] = "127.0.0.1"; - int redis_port = 6379; - int redis_db = 0; - - struct maat_options *opts = maat_options_new(); - maat_options_set_redis(opts, redis_ip, redis_port, redis_db); - maat_options_set_logger(opts, "./maat_framework_gtest.log", LOG_LEVEL_INFO); - - _shared_maat_instance = maat_new(opts, table_info_path); - assert(_shared_maat_instance != NULL); - - maat_cmd_flushDB(_shared_maat_instance); - maat_free(_shared_maat_instance); - - maat_options_set_foreign_cont_dir(opts, "./foreign_files/"); - maat_options_set_rule_effect_interval_ms(opts, 10 * 1000); //20s for garbage collection - _shared_maat_instance = maat_new(opts, table_info_path); - maat_options_free(opts); - } - - static void TearDownTestCase() { - maat_free(_shared_maat_instance); - } - - static struct maat *_shared_maat_instance; -}; - -struct maat *MaatStreamScan::_shared_maat_instance; - -TEST_F(MaatStreamScan, dynamic_config) { - const char *scan_data1 = "hello world cyberessays.com/search_results.php?action=search&query=yulingjing,abckkk,1234567"; - const char *table_name = "HTTP_URL"; - const char *keywords1 = "hello"; - char keyword_buf[128]; - long long results[ARRAY_SIZE] = {0}; - size_t n_hit_result = 0; - int thread_id = 0; - struct maat *maat_instance = MaatStreamScan::_shared_maat_instance; - struct maat_state *state = maat_state_new(maat_instance, thread_id); - - long long compile1_id = maat_cmd_incrby(maat_instance, "TEST_SEQ", 1); - int ret = test_add_expr_command(maat_instance, table_name, compile1_id, 0, keywords1); - EXPECT_EQ(ret, 1); - sleep(WAIT_FOR_EFFECTIVE_S * 2); - - int table_id = maat_get_table_id(maat_instance, table_name); - ASSERT_GT(table_id, 0); - - struct maat_stream *sp = maat_stream_new(maat_instance, table_id, state); - ASSERT_TRUE(sp != NULL); - - ret = maat_stream_scan(sp, "www.cyberessays.com", strlen("www.cyberessays.com"), - results, ARRAY_SIZE, &n_hit_result, state); - EXPECT_EQ(ret, MAAT_SCAN_OK); - - ret = maat_stream_scan(sp, scan_data1, strlen(scan_data1), results, ARRAY_SIZE, - &n_hit_result, state); - EXPECT_EQ(ret, MAAT_SCAN_HIT); - EXPECT_EQ(results[0], compile1_id); - maat_state_reset(state); - - for (int i = 0; i < 100; i++) { - random_keyword_generate(keyword_buf, sizeof(keyword_buf)); - long long compile_id = maat_cmd_incrby(maat_instance, "TEST_SEQ", 1); - ret = test_add_expr_command(maat_instance, table_name, compile_id, 0, keyword_buf); - EXPECT_EQ(ret, 1); - - ret = maat_stream_scan(sp, "www.cyberessays.com", strlen("www.cyberessays.com"), - results, ARRAY_SIZE, &n_hit_result, state); - EXPECT_EQ(ret, MAAT_SCAN_OK); - - ret = maat_stream_scan(sp, scan_data1, strlen(scan_data1), results, ARRAY_SIZE, - &n_hit_result, state); - EXPECT_EQ(ret, MAAT_SCAN_HIT); - EXPECT_EQ(results[0], compile1_id); - maat_state_reset(state); - usleep(500 * 1000); - } - - maat_stream_free(sp); - maat_state_free(state); - sp = NULL; - state = NULL; -} -#endif #if 1 class MaatFlagScan : public testing::Test { @@ -1371,6 +1281,100 @@ TEST_F(MaatStringScan, dynamic_config) { state = NULL; } +class MaatStreamScan : public testing::Test +{ +protected: + static void SetUpTestCase() { + char redis_ip[64] = "127.0.0.1"; + int redis_port = 6379; + int redis_db = 0; + + struct maat_options *opts = maat_options_new(); + maat_options_set_redis(opts, redis_ip, redis_port, redis_db); + maat_options_set_logger(opts, "./maat_framework_gtest.log", LOG_LEVEL_INFO); + + _shared_maat_instance = maat_new(opts, table_info_path); + assert(_shared_maat_instance != NULL); + + maat_cmd_flushDB(_shared_maat_instance); + maat_free(_shared_maat_instance); + + maat_options_set_foreign_cont_dir(opts, "./foreign_files/"); + maat_options_set_rule_effect_interval_ms(opts, 0); + maat_options_set_gc_timeout_ms(opts, 0); // start GC immediately + _shared_maat_instance = maat_new(opts, table_info_path); + maat_options_free(opts); + } + + static void TearDownTestCase() { + maat_free(_shared_maat_instance); + } + + static struct maat *_shared_maat_instance; +}; + +struct maat *MaatStreamScan::_shared_maat_instance; + +TEST_F(MaatStreamScan, dynamic_config) { + const char *scan_data1 = "hello world cyberessays.com/search_results.php?action=search&query=yulingjing,abckkk,1234567"; + const char *table_name = "HTTP_URL"; + const char *keywords1 = "hello"; + char keyword_buf[128]; + long long results[ARRAY_SIZE] = {0}; + size_t n_hit_result = 0; + int thread_id = 0; + struct maat *maat_instance = MaatStreamScan::_shared_maat_instance; + struct maat_state *state = maat_state_new(maat_instance, thread_id); + + // STEP 1: add keywords1 and wait scan stream to hit + long long compile1_id = maat_cmd_incrby(maat_instance, "TEST_SEQ", 1); + int ret = test_add_expr_command(maat_instance, table_name, compile1_id, 0, keywords1); + EXPECT_EQ(ret, 1); + + sleep(WAIT_FOR_EFFECTIVE_S * 2); + + int table_id = maat_get_table_id(maat_instance, table_name); + ASSERT_GT(table_id, 0); + + struct maat_stream *sp = maat_stream_new(maat_instance, table_id, state); + ASSERT_TRUE(sp != NULL); + + ret = maat_stream_scan(sp, "www.cyberessays.com", strlen("www.cyberessays.com"), + results, ARRAY_SIZE, &n_hit_result, state); + EXPECT_EQ(ret, MAAT_SCAN_OK); + + ret = maat_stream_scan(sp, scan_data1, strlen(scan_data1), results, ARRAY_SIZE, + &n_hit_result, state); + EXPECT_EQ(ret, MAAT_SCAN_HIT); + EXPECT_EQ(results[0], compile1_id); + maat_state_reset(state); + + // STEP 2: Inc config update, use same stream to scan and wait old expr_runtime invalid + random_keyword_generate(keyword_buf, sizeof(keyword_buf)); + long long compile_id = maat_cmd_incrby(maat_instance, "TEST_SEQ", 1); + ret = test_add_expr_command(maat_instance, table_name, compile_id, 0, keyword_buf); + EXPECT_EQ(ret, 1); + + // Inc config has not yet taken effect, stream scan can hit compile + ret = maat_stream_scan(sp, scan_data1, strlen(scan_data1), results, ARRAY_SIZE, + &n_hit_result, state); + EXPECT_EQ(ret, MAAT_SCAN_HIT); + EXPECT_EQ(results[0], compile1_id); + maat_state_reset(state); + + sleep(WAIT_FOR_EFFECTIVE_S * 2); + + // Inc config has taken effect, stream reference old expr_runtime, should not hit compile + ret = maat_stream_scan(sp, scan_data1, strlen(scan_data1), results, ARRAY_SIZE, + &n_hit_result, state); + EXPECT_EQ(ret, MAAT_SCAN_OK); + + maat_stream_free(sp); + maat_state_free(state); + sp = NULL; + state = NULL; +} + class MaatIPScan : public testing::Test { protected: