interrupt execution if table schema has error

This commit is contained in:
liuwentan
2023-03-23 19:16:23 +08:00
parent 2ce749d9bc
commit 7b49d7d52f
9 changed files with 149 additions and 186 deletions

View File

@@ -151,6 +151,7 @@ struct serial_rule {
char *table_line;
int n_foreign;
struct foreign_key *f_keys;
redisContext *ref_ctx;
TAILQ_ENTRY(serial_rule) entries;
UT_hash_handle hh;
};

View File

@@ -957,8 +957,7 @@ int flag_scan(struct table_manager *tbl_mgr, int thread_id, long long flag,
int physical_table_id, int vtable_id, struct maat_state *state)
{
enum table_type table_type = table_manager_get_table_type(tbl_mgr, physical_table_id);
if ((table_type == TABLE_TYPE_FLAG_PLUS) &&
(NULL == state || DISTRICT_FLAG_UNSET == state->is_set_district)) {
if (table_type == TABLE_TYPE_FLAG_PLUS && DISTRICT_FLAG_UNSET == state->is_set_district) {
return -1;
}
@@ -989,8 +988,7 @@ int interval_scan(struct table_manager *tbl_mgr, int thread_id, long long intege
{
enum table_type table_type = table_manager_get_table_type(tbl_mgr, physical_table_id);
if ((table_type == TABLE_TYPE_INTERVAL_PLUS) &&
(NULL == state || DISTRICT_FLAG_UNSET == state->is_set_district)) {
if (table_type == TABLE_TYPE_INTERVAL_PLUS && DISTRICT_FLAG_UNSET == state->is_set_district) {
// maat_instance->scan_err_cnt++;
return -1;
}
@@ -1075,8 +1073,7 @@ int string_scan(struct table_manager *tbl_mgr, int thread_id, const char *data,
int physical_table_id, int vtable_id, struct maat_state *state)
{
enum table_type table_type = table_manager_get_table_type(tbl_mgr, physical_table_id);
if ((table_type == TABLE_TYPE_EXPR_PLUS) &&
(NULL == state || DISTRICT_FLAG_UNSET == state->is_set_district)) {
if (table_type == TABLE_TYPE_EXPR_PLUS && DISTRICT_FLAG_UNSET == state->is_set_district) {
// maat_instance->scan_err_cnt++;
return -1;
}
@@ -1112,8 +1109,7 @@ int expr_stream_scan(struct maat_stream *stream, const char *data, size_t data_l
struct table_manager *tbl_mgr = stream->ref_maat_instance->tbl_mgr;
enum table_type table_type = table_manager_get_table_type(tbl_mgr, stream->physical_table_id);
if ((table_type == TABLE_TYPE_EXPR_PLUS) &&
(NULL == state || DISTRICT_FLAG_UNSET == state->is_set_district)) {
if (table_type == TABLE_TYPE_EXPR_PLUS && DISTRICT_FLAG_UNSET == state->is_set_district) {
// maat_instance->scan_err_cnt++;
return -1;
}
@@ -1698,7 +1694,6 @@ void maat_state_free(struct maat_state *state)
assert(state->compile_state != NULL);
maat_compile_state_free(state->compile_state);
state->compile_state = NULL;
//alignment_int64_array_add(mid->maat_instance->compile_state_cnt, mid->thread_id, -1);
state->maat_instance = NULL;
free(state);
}

View File

@@ -490,12 +490,7 @@ table_manager_create(const char *table_info_path, const char *accept_tags,
"[%s:%d] register_tablename2id failed", __FUNCTION__, __LINE__);
FREE(json_buff);
cJSON_Delete(root);
maat_kv_store_free(tbl_mgr->tablename2id_map);
for (int idx = 0; idx < tbl_mgr->n_accept_tag; idx++) {
FREE(tbl_mgr->accept_tags[idx].tag_name);
FREE(tbl_mgr->accept_tags[idx].tag_val);
}
FREE(tbl_mgr);
table_manager_destroy(tbl_mgr);
return NULL;
}
}
@@ -512,7 +507,8 @@ table_manager_create(const char *table_info_path, const char *accept_tags,
if (json != NULL && json->type == cJSON_Object) {
struct maat_table *maat_tbl = maat_table_new(json, reserved_word_map, logger);
if (NULL == maat_tbl) {
continue;
ret = -1;
goto next;
}
maat_tbl->schema = maat_table_schema_new(json, maat_tbl->table_name,
@@ -521,8 +517,8 @@ table_manager_create(const char *table_info_path, const char *accept_tags,
log_error(logger, MODULE_TABLE,
"[%s:%d] Maat table schema new failed, table_name:%s",
__FUNCTION__, __LINE__, maat_tbl->table_name);
maat_table_free(maat_tbl);
continue;
ret = -1;
goto next;
}
log_info(logger, MODULE_TABLE, "successfully register table[%s]->table_id:%d",
maat_tbl->table_name, maat_tbl->table_id);
@@ -547,11 +543,16 @@ table_manager_create(const char *table_info_path, const char *accept_tags,
log_info(logger, MODULE_TABLE, "default compile table id: %d", default_compile_table_id);
log_info(logger, MODULE_TABLE, "group2group table id: %d", g2g_table_id);
next:
FREE(json_buff);
maat_kv_store_free(reserved_word_map);
cJSON_Delete(root);
if (ret < 0) {
table_manager_destroy(tbl_mgr);
return NULL;
}
return tbl_mgr;
}
@@ -628,13 +629,12 @@ void table_manager_runtime_destroy(struct table_manager *tbl_mgr)
for(size_t i = 0; i < MAX_TABLE_NUM; i++) {
void *runtime = table_manager_get_runtime(tbl_mgr, i);
if (NULL == runtime) {
continue;
if (runtime != NULL) {
enum table_type table_type = table_manager_get_table_type(tbl_mgr, i);
assert(table_type != TABLE_TYPE_INVALID);
maat_table_runtime_free(runtime, table_type);
tbl_mgr->tbl[i]->runtime = NULL;
}
enum table_type table_type = table_manager_get_table_type(tbl_mgr, i);
maat_table_runtime_free(runtime, table_type);
tbl_mgr->tbl[i]->runtime = NULL;
}
/* free district map */
@@ -647,30 +647,28 @@ void table_manager_destroy(struct table_manager *tbl_mgr)
return;
}
for (size_t i = 0; i < MAX_TABLE_NUM; i++) {
void *runtime = table_manager_get_runtime(tbl_mgr, i);
assert(NULL == runtime);
void *schema = table_manager_get_schema(tbl_mgr, i);
if (NULL == schema) {
continue;
}
enum table_type table_type = table_manager_get_table_type(tbl_mgr, i);
maat_table_schema_free(schema, table_type);
tbl_mgr->tbl[i]->schema = NULL;
size_t i = 0;
for (i = 0; i < MAX_TABLE_NUM; i++) {
maat_table_free(tbl_mgr->tbl[i]);
tbl_mgr->tbl[i] = NULL;
}
for (size_t i = 0; i < tbl_mgr->n_accept_tag; i++) {
FREE(tbl_mgr->accept_tags[i].tag_name);
FREE(tbl_mgr->accept_tags[i].tag_val);
}
FREE(tbl_mgr->accept_tags);
if (tbl_mgr->accept_tags != NULL) {
for (i = 0; i < tbl_mgr->n_accept_tag; i++) {
if (tbl_mgr->accept_tags[i].tag_name != NULL) {
FREE(tbl_mgr->accept_tags[i].tag_name);
}
if (tbl_mgr->accept_tags[i].tag_val != NULL) {
FREE(tbl_mgr->accept_tags[i].tag_val);
}
}
FREE(tbl_mgr->accept_tags);
}
maat_kv_store_free(tbl_mgr->tablename2id_map);
maat_kv_store_free(tbl_mgr->tablename2id_map);
tbl_mgr->tablename2id_map = NULL;
FREE(tbl_mgr);
}