compile/plugin ex_schema support input param table_name

This commit is contained in:
liuwentan
2023-03-29 22:25:14 +08:00
parent 658625fde3
commit 10571d3de4
34 changed files with 369 additions and 242 deletions

View File

@@ -34,6 +34,7 @@ struct compile_schema {
int evaluation_order_column;
struct ex_data_schema *ex_schema;
int table_id; //ugly
char table_name[NAME_MAX];
struct table_manager *ref_tbl_mgr;
unsigned long long update_err_cnt;
@@ -71,6 +72,7 @@ struct compile_runtime {
struct bool_matcher *bm;
struct maat_compile *compile_hash; // <compile_id, struct maat_compile>
struct maat_runtime *ref_maat_rt;
time_t version;
uint32_t rule_num;
struct maat_clause *clause_by_literals_hash;
pthread_rwlock_t rwlock; /* TODO: replaced with mutex? */
@@ -116,6 +118,7 @@ struct compile_sort_para {
struct maat_compile {
unsigned int magic;
long long compile_id;
char table_name[NAME_MAX];
int actual_clause_num;
int declared_clause_num;
int not_clause_cnt;
@@ -136,6 +139,7 @@ struct maat_internal_hit_path {
struct maat_compile_state {
int thread_id;
int Nth_scan;
time_t compile_rt_version;
size_t this_scan_hit_item_cnt;
int not_clause_hitted_flag;
int is_no_count_scan;
@@ -189,12 +193,13 @@ void *compile_runtime_get_user_data(struct compile_runtime *compile_rt, long lon
return ret;
}
void *rule_ex_data_new(int table_id, const char *table_line,
void *rule_ex_data_new(const char *table_name, int table_id, const char *table_line,
const struct ex_data_schema *ex_schema)
{
void *ex_data = NULL;
ex_schema->new_func(table_id, NULL, table_line, &ex_data, ex_schema->argl, ex_schema->argp);
ex_schema->new_func(table_name, table_id, NULL, table_line, &ex_data,
ex_schema->argl, ex_schema->argp);
return ex_data;
}
@@ -204,7 +209,7 @@ void rule_ex_data_free(int table_id, void **ex_data, const struct ex_data_schema
ex_schema->free_func(table_id, ex_data, ex_schema->argl, ex_schema->argp);
}
void rule_ex_data_new_cb(void *user_data, void *param, int table_id)
void rule_ex_data_new_cb(void *user_data, void *param, const char *table_name, int table_id)
{
struct ex_data_schema *ex_schema = (struct ex_data_schema *)param;
struct compile_rule *compile = (struct compile_rule *)user_data;
@@ -214,12 +219,12 @@ void rule_ex_data_new_cb(void *user_data, void *param, int table_id)
// return;
// }
void *ad = rule_ex_data_new(table_id, compile->table_line, ex_schema);
void *ad = rule_ex_data_new(table_name, table_id, compile->table_line, ex_schema);
*compile->ex_data = ad;
}
void compile_runtime_user_data_iterate(struct compile_runtime *compile_rt,
void (*callback)(void *user_data, void *param, int table_id),
void (*callback)(void *user_data, void *param, const char *table_name, int table_id),
void *param, int table_id)
{
struct maat_compile *compile = NULL, *tmp_compile = NULL;
@@ -227,7 +232,7 @@ void compile_runtime_user_data_iterate(struct compile_runtime *compile_rt,
pthread_rwlock_rdlock(&compile_rt->rwlock);
HASH_ITER(hh, compile_rt->compile_hash, compile, tmp_compile) {
if (compile->user_data) {
callback(compile->user_data, param, table_id);
callback(compile->user_data, param, compile->table_name, table_id);
}
}
pthread_rwlock_unlock(&compile_rt->rwlock);
@@ -499,6 +504,7 @@ void *compile_runtime_new(void *compile_schema, int max_thread_num,
struct compile_runtime *compile_rt = ALLOC(struct compile_runtime, 1);
compile_rt->expr_match_buff = ALLOC(struct bool_expr_match, max_thread_num * MAX_SCANNER_HIT_COMPILE_NUM);
compile_rt->version = time(NULL);
compile_rt->clause_by_literals_hash = NULL;
compile_rt->logger = logger;
compile_rt->ref_garbage_bin = garbage_bin;
@@ -645,7 +651,7 @@ int is_valid_table_name(const char *str)
struct group2compile_item *
group2compile_item_new(const char *line, struct group2compile_schema *g2c_schema,
struct log_handle *logger)
const char *table_name, struct log_handle *logger)
{
size_t column_offset = 0;
size_t column_len = 0;
@@ -655,8 +661,8 @@ group2compile_item_new(const char *line, struct group2compile_schema *g2c_schema
int ret = get_column_pos(line, g2c_schema->group_id_column, &column_offset, &column_len);
if (ret < 0) {
log_error(logger, MODULE_COMPILE,
"[%s:%d] group2compile table(table_id:%d) line:%s has no group_id",
__FUNCTION__, __LINE__, g2c_schema->table_id, line);
"[%s:%d] group2compile table:%s line:%s has no group_id",
__FUNCTION__, __LINE__, table_name, line);
goto error;
}
g2c_item->group_id = atoll(line + column_offset);
@@ -664,8 +670,8 @@ group2compile_item_new(const char *line, struct group2compile_schema *g2c_schema
ret = get_column_pos(line, g2c_schema->compile_id_column, &column_offset, &column_len);
if (ret < 0) {
log_error(logger, MODULE_COMPILE,
"[%s:%d] group2compile table(table_id:%d) line:%s has no compile_id",
__FUNCTION__, __LINE__, g2c_schema->table_id, line);
"[%s:%d] group2compile table:%s line:%s has no compile_id",
__FUNCTION__, __LINE__, table_name, line);
goto error;
}
g2c_item->compile_id = atoll(line + column_offset);
@@ -673,8 +679,8 @@ group2compile_item_new(const char *line, struct group2compile_schema *g2c_schema
ret = get_column_pos(line, g2c_schema->not_flag_column, &column_offset, &column_len);
if (ret < 0) {
log_error(logger, MODULE_COMPILE,
"[%s:%d] group2compile table(table_id:%d) line:%s has no NOT_flag",
__FUNCTION__, __LINE__, g2c_schema->table_id, line);
"[%s:%d] group2compile table:%s line:%s has no NOT_flag",
__FUNCTION__, __LINE__, table_name, line);
goto error;
}
g2c_item->not_flag = atoi(line + column_offset);
@@ -682,15 +688,15 @@ group2compile_item_new(const char *line, struct group2compile_schema *g2c_schema
ret = get_column_pos(line, g2c_schema->vtable_name_column, &column_offset, &column_len);
if (ret < 0) {
log_error(logger, MODULE_COMPILE,
"[%s:%d] group2compile table(table_id:%d) line:%s has no virtual_table_name",
__FUNCTION__, __LINE__, g2c_schema->table_id, line);
"[%s:%d] group2compile table:%s line:%s has no virtual_table_name",
__FUNCTION__, __LINE__, table_name, line);
goto error;
}
if (column_len > NAME_MAX) {
log_error(logger, MODULE_COMPILE,
"[%s:%d] group2compile table(table_id:%d) line:%s virtual_table_name length too long",
__FUNCTION__, __LINE__, g2c_schema->table_id, line);
"[%s:%d] group2compile table:%s line:%s virtual_table_name length too long",
__FUNCTION__, __LINE__, table_name, line);
goto error;
}
@@ -700,8 +706,8 @@ group2compile_item_new(const char *line, struct group2compile_schema *g2c_schema
g2c_item->vtable_id = table_manager_get_table_id(g2c_schema->ref_tbl_mgr, vtable_name);
if (g2c_item->vtable_id < 0) {
log_error(logger, MODULE_COMPILE,
"[%s:%d] group2compile table(table_id:%d) line:%s unknown virtual table:%s",
__FUNCTION__, __LINE__, g2c_schema->table_id, line, vtable_name);
"[%s:%d] group2compile table:%s line:%s unknown virtual table:%s",
__FUNCTION__, __LINE__, table_name, line, vtable_name);
goto error;
}
}
@@ -709,8 +715,8 @@ group2compile_item_new(const char *line, struct group2compile_schema *g2c_schema
ret = get_column_pos(line, g2c_schema->clause_index_column, &column_offset, &column_len);
if (ret < 0) {
log_error(logger, MODULE_COMPILE,
"[%s:%d] group2compile table(table_id:%d) line:%s has no clause_index",
__FUNCTION__, __LINE__, g2c_schema->table_id, line);
"[%s:%d] group2compile table:%s line:%s has no clause_index",
__FUNCTION__, __LINE__, table_name, line);
goto error;
}
@@ -748,13 +754,16 @@ struct maat_compile *maat_compile_new(long long compile_id)
return compile;
}
int maat_compile_set(struct maat_compile *compile, int declared_clause_num,
void *user_data, void (*user_data_free)(void *))
int maat_compile_set(struct maat_compile *compile, const char *table_name,
int declared_clause_num, void *user_data,
void (*user_data_free)(void *))
{
if (user_data != NULL && NULL == user_data_free) {
return -1;
}
memset(compile->table_name, 0, sizeof(compile->table_name));
memcpy(compile->table_name, table_name, sizeof(compile->table_name));
compile->declared_clause_num = declared_clause_num;
compile->user_data = user_data;
compile->user_data_free = user_data_free;
@@ -784,8 +793,14 @@ int maat_compile_hash_add(struct maat_compile **compile_hash, long long compile_
return ret;
}
void garbage_maat_compile_free(void *maat_compile, void *arg)
{
struct maat_compile *compile = (struct maat_compile *)maat_compile;
maat_compile_free(compile);
}
void maat_compile_hash_set(struct maat_compile **compile_hash, long long compile_id,
struct maat_compile *compile)
const char *table_name, struct maat_compile *compile)
{
struct maat_compile *tmp_compile = NULL;
@@ -793,7 +808,7 @@ void maat_compile_hash_set(struct maat_compile **compile_hash, long long compile
assert(tmp_compile != NULL);
assert(tmp_compile->user_data == NULL);
maat_compile_set(tmp_compile, compile->declared_clause_num,
maat_compile_set(tmp_compile, table_name, compile->declared_clause_num,
compile->user_data, compile->user_data_free);
}
@@ -808,7 +823,7 @@ int maat_compile_hash_remove(struct maat_compile **compile_hash, struct maat_com
if (0 == compile->actual_clause_num) {
HASH_DEL(*compile_hash, compile);
maat_garbage_bagging(garbage_bin, compile, (void (*)(void *))maat_compile_free);
maat_garbage_bagging(garbage_bin, compile, NULL, garbage_maat_compile_free);
}
//TODO:mytest need to delete
@@ -1033,11 +1048,6 @@ error:
return bm;
}
void maat_compile_bool_matcher_free(struct bool_matcher *bm)
{
bool_matcher_free(bm);
}
static int maat_compile_has_clause(struct maat_compile *compile, long long clause_id)
{
struct maat_clause_state *clause_state = NULL;
@@ -1083,6 +1093,16 @@ size_t maat_compile_bool_matcher_match(struct compile_runtime *compile_rt, int i
struct bool_expr_match *expr_match = compile_rt->expr_match_buff + compile_state->thread_id * MAX_SCANNER_HIT_COMPILE_NUM;
assert(compile_state->thread_id >= 0);
if (0 == compile_state->compile_rt_version) {
compile_state->compile_rt_version = compile_rt->version;
}
if (NULL == compile_rt->bm || 0 == utarray_len(compile_state->all_hit_clauses)
|| compile_state->compile_rt_version != compile_rt->version) {
compile_state->this_scan_hit_item_cnt = 0;
return 0;
}
//TODO:mytest need to delete
#if 0
unsigned long long *p = utarray_eltptr(compile_state->all_hit_clauses, 0);
@@ -1181,7 +1201,7 @@ int maat_remove_group_from_compile(struct maat_compile **compile_hash,
if (0 == compile->actual_clause_num && NULL == compile->user_data) {
HASH_DEL(*compile_hash, compile);
maat_garbage_bagging(garbage_bin, compile, (void (*)(void*))maat_compile_free);
maat_garbage_bagging(garbage_bin, compile, NULL, garbage_maat_compile_free);
}
return 0;
@@ -1223,6 +1243,7 @@ void maat_compile_state_reset(struct maat_compile_state *compile_state)
}
compile_state->Nth_scan = 0;
compile_state->compile_rt_version = 0;
compile_state->this_scan_hit_item_cnt = 0;
compile_state->not_clause_hitted_flag = 0;
compile_state->is_no_count_scan = 0;
@@ -1311,6 +1332,10 @@ size_t compile_runtime_get_hit_paths(struct compile_runtime *compile_rt,
struct bool_expr_match *expr_match = compile_rt->expr_match_buff + compile_state->thread_id * MAX_SCANNER_HIT_COMPILE_NUM;
assert(compile_state->thread_id >= 0);
if (compile_state->compile_rt_version != compile_rt->version) {
return 0;
}
pthread_rwlock_rdlock(&compile_rt->rwlock);
int bool_match_ret = bool_matcher_match(compile_rt->bm,
(unsigned long long *)utarray_eltptr(compile_state->all_hit_clauses, 0),
@@ -1436,6 +1461,7 @@ int maat_compile_state_has_NOT_clause(struct maat_compile_state *compile_state)
void compile_item_to_compile_rule(struct compile_item *compile_item,
struct compile_schema *compile_schema,
struct compile_rule *compile_rule,
const char *table_name,
const char *table_line)
{
compile_rule->magic_num = COMPILE_RULE_MAGIC;
@@ -1448,8 +1474,9 @@ void compile_item_to_compile_rule(struct compile_item *compile_item,
compile_rule->evaluation_order = compile_item->evaluation_order;
if (compile_schema->ex_schema != NULL) {
*(compile_rule->ex_data) = rule_ex_data_new(compile_schema->table_id, compile_rule->table_line,
compile_schema->ex_schema);
*(compile_rule->ex_data) = rule_ex_data_new(table_name, compile_schema->table_id,
compile_rule->table_line,
compile_schema->ex_schema);
}
compile_rule->compile_id = compile_item->compile_id;
@@ -1507,7 +1534,8 @@ void *compile_runtime_get_ex_data(struct compile_runtime *compile_rt,
}
int compile_runtime_update(void *compile_runtime, void *compile_schema,
const char *line, int valid_column)
const char *table_name, const char *line,
int valid_column)
{
if (NULL == compile_runtime || NULL == compile_schema ||
NULL == line) {
@@ -1543,8 +1571,8 @@ int compile_runtime_update(void *compile_runtime, void *compile_schema,
pthread_rwlock_unlock(&compile_rt->rwlock);
if (ret < 0) {
log_error(compile_rt->logger, MODULE_COMPILE,
"[%s:%d] remove compile table(table_id:%d) compile(compile_id:%lld) from compile_hash failed",
__FUNCTION__, __LINE__, schema->table_id, compile_id);
"[%s:%d] remove compile table:%s compile(compile_id:%lld) from compile_hash failed",
__FUNCTION__, __LINE__, table_name, compile_id);
return -1;
}
} else {
@@ -1558,7 +1586,7 @@ int compile_runtime_update(void *compile_runtime, void *compile_schema,
}
struct compile_rule *compile_rule = ALLOC(struct compile_rule, 1);
compile_item_to_compile_rule(compile_item, schema, compile_rule, line);
compile_item_to_compile_rule(compile_item, schema, compile_rule, table_name, line);
compile_item_free(compile_item);
compile_item = NULL;
@@ -1567,16 +1595,16 @@ int compile_runtime_update(void *compile_runtime, void *compile_schema,
destroy_compile_rule(compile_rule);
pthread_rwlock_unlock(&compile_rt->rwlock);
log_error(compile_rt->logger, MODULE_COMPILE,
"[%s:%d] maat_compile_new failed, compile_table(table_id:%d) compile_id:%d",
__FUNCTION__, __LINE__, schema->table_id, compile_item->compile_id);
"[%s:%d] maat_compile_new failed, compile_table:%s compile_id:%d",
__FUNCTION__, __LINE__, table_name, compile_item->compile_id);
return -1;
}
maat_compile_set(compile, compile_rule->declared_clause_num, compile_rule,
maat_compile_set(compile, table_name, compile_rule->declared_clause_num, compile_rule,
(void (*)(void *))destroy_compile_rule);
struct maat_compile *tmp_compile = maat_compile_hash_find(&(compile_rt->compile_hash), compile_id);
if (tmp_compile != NULL) {
maat_compile_hash_set(&(compile_rt->compile_hash), compile_id, compile);
maat_compile_hash_set(&(compile_rt->compile_hash), compile_id, table_name, compile);
} else {
maat_compile_hash_add(&(compile_rt->compile_hash), compile_id, compile);
}
@@ -1590,7 +1618,8 @@ int compile_runtime_update(void *compile_runtime, void *compile_schema,
}
int group2compile_runtime_update(void *g2c_runtime, void *g2c_schema,
const char *line, int valid_column)
const char *table_name, const char *line,
int valid_column)
{
if (NULL == g2c_runtime || NULL == g2c_schema || NULL == line) {
return -1;
@@ -1606,7 +1635,8 @@ int group2compile_runtime_update(void *g2c_runtime, void *g2c_schema,
}
int ret = -1;
struct group2compile_item *g2c_item = group2compile_item_new(line, schema, compile_rt->logger);
struct group2compile_item *g2c_item = group2compile_item_new(line, schema, table_name,
compile_rt->logger);
if (NULL == g2c_item) {
return -1;
}
@@ -1691,8 +1721,8 @@ int compile_runtime_commit(void *compile_runtime, const char *table_name)
compile_rt->bm = new_bool_matcher;
pthread_rwlock_unlock(&compile_rt->rwlock);
maat_garbage_bagging(compile_rt->ref_garbage_bin, old_bool_matcher,
(void (*)(void*))maat_compile_bool_matcher_free);
maat_garbage_bagging(compile_rt->ref_garbage_bin, old_bool_matcher, NULL,
garbage_bool_matcher_free);
compile_rt->rule_num = compile_cnt;