This commit is contained in:
liuwentan
2023-04-13 14:56:35 +08:00
parent 2c787fd231
commit 571ce08d3b
25 changed files with 341 additions and 241 deletions

View File

@@ -64,16 +64,19 @@ struct table_operations {
enum table_type type;
void *(*new_schema)(cJSON *json, struct table_manager *tbl_mgr,
const char *table_name, struct log_handle *logger);
void (*free_schema)(void *schema);
void *(*new_runtime)(void *schema, int max_thread_num,
struct maat_garbage_bin *garbage_bin,
void *(*new_runtime)(void *schema, int max_thread_num, struct maat_garbage_bin *garbage_bin,
struct log_handle *logger);
void (*free_runtime)(void *runtime);
int (*update_runtime)(void *runtime, void *schema, const char *table_name,
const char *line, int valid_column);
int (*commit_runtime)(void *runtime, const char *table_name);
int (*commit_runtime)(void *runtime, const char *table_name,
long long maat_rt_version);
long long (*runtime_rule_count)(void *runtime);
};
@@ -564,14 +567,14 @@ next:
return tbl_mgr;
}
void *maat_table_runtime_new(void *schema, enum table_type table_type,
int max_thread_num, struct maat_garbage_bin *garbage_bin,
struct log_handle *logger)
void *maat_table_runtime_new(void *schema, enum table_type table_type, int max_thread_num,
struct maat_garbage_bin *garbage_bin, struct log_handle *logger)
{
void *runtime = NULL;
if (table_ops[table_type].new_runtime != NULL) {
runtime = table_ops[table_type].new_runtime(schema, max_thread_num, garbage_bin, logger);
runtime = table_ops[table_type].new_runtime(schema, max_thread_num,
garbage_bin, logger);
}
return runtime;
@@ -851,7 +854,8 @@ int table_manager_update_runtime(struct table_manager *tbl_mgr, const char *tabl
line, valid_column);
}
void table_commit_updating_runtime(struct table_manager *tbl_mgr, int table_id)
void table_commit_updating_runtime(struct table_manager *tbl_mgr, int table_id,
long long maat_rt_version)
{
void *updating_rt = table_manager_get_updating_runtime(tbl_mgr, table_id);
if (NULL == updating_rt) {
@@ -868,7 +872,7 @@ void table_commit_updating_runtime(struct table_manager *tbl_mgr, int table_id)
struct maat_table *ptable = tbl_mgr->tbl[table_id];
if ( table_ops[table_type].commit_runtime != NULL) {
table_ops[table_type].commit_runtime(updating_rt, ptable->table_name);
table_ops[table_type].commit_runtime(updating_rt, ptable->table_name, maat_rt_version);
}
void *runtime = table_manager_get_runtime(tbl_mgr, table_id);
@@ -876,13 +880,15 @@ void table_commit_updating_runtime(struct table_manager *tbl_mgr, int table_id)
if (runtime != NULL) {
enum table_type *arg = ALLOC(enum table_type, 1);
*arg = table_type;
maat_garbage_bagging(tbl_mgr->ref_garbage_bin, runtime, arg, garbage_maat_table_runtime_free);
maat_garbage_bagging(tbl_mgr->ref_garbage_bin, runtime, arg,
garbage_maat_table_runtime_free);
}
tbl_mgr->tbl[table_id]->updating_runtime = NULL;
}
void table_commit_runtime(struct table_manager *tbl_mgr, int table_id)
void table_commit_runtime(struct table_manager *tbl_mgr, int table_id,
long long maat_rt_version)
{
void *runtime = table_manager_get_runtime(tbl_mgr, table_id);
if (NULL == runtime) {
@@ -899,21 +905,21 @@ void table_commit_runtime(struct table_manager *tbl_mgr, int table_id)
struct maat_table *ptable = tbl_mgr->tbl[table_id];
if (table_ops[table_type].commit_runtime != NULL) {
table_ops[table_type].commit_runtime(runtime, ptable->table_name);
table_ops[table_type].commit_runtime(runtime, ptable->table_name, maat_rt_version);
}
}
void table_manager_commit_runtime(struct table_manager *tbl_mgr, int table_id,
int update_type)
int update_type, long long maat_rt_version)
{
if (NULL == tbl_mgr || table_id < 0) {
return;
}
if (update_type == MAAT_UPDATE_TYPE_FULL) {
table_commit_updating_runtime(tbl_mgr, table_id);
table_commit_updating_runtime(tbl_mgr, table_id, maat_rt_version);
} else {
table_commit_runtime(tbl_mgr, table_id);
table_commit_runtime(tbl_mgr, table_id, maat_rt_version);
}
}