refactor rcu table runtime

This commit is contained in:
liuwentan
2023-04-12 20:48:19 +08:00
parent f213fcbe97
commit 2c787fd231
4 changed files with 123 additions and 58 deletions

View File

@@ -68,9 +68,11 @@ int table_manager_accept_tags_match(struct table_manager *tbl_mgr, const char *t
void *table_manager_get_schema(struct table_manager *tbl_mgr, int table_id);
void *table_manager_get_runtime(struct table_manager *tbl_mgr, int table_id);
void *table_manager_get_updating_runtime(struct table_manager *tbl_mgr, int table_id);
int table_manager_update_runtime(struct table_manager *tbl_mgr, const char *table_name, int table_id, const char *line);
void table_manager_commit_runtime(struct table_manager *tbl_mgr, int table_id);
int table_manager_update_runtime(struct table_manager *tbl_mgr, const char *table_name,
int table_id, const char *line, int update_type);
void table_manager_commit_runtime(struct table_manager *tbl_mgr, int table_id, int update_type);
long long table_manager_runtime_rule_count(struct table_manager *tbl_mgr, int table_id);

View File

@@ -603,10 +603,7 @@ void compile_runtime_init(void *compile_runtime, struct maat_runtime *maat_rt)
}
struct compile_runtime *compile_rt = (struct compile_runtime *)compile_runtime;
pthread_rwlock_wrlock(&compile_rt->rwlock);
compile_rt->ref_maat_rt = maat_rt;
pthread_rwlock_unlock(&compile_rt->rwlock);
}
void *group2compile_runtime_new(void *g2c_schema, int max_thread_num,

View File

@@ -256,10 +256,11 @@ struct maat_runtime* maat_runtime_create(long long version, struct maat *maat_in
return maat_rt;
}
void maat_runtime_commit(struct maat_runtime *maat_rt, struct log_handle *logger)
void maat_runtime_commit(struct maat_runtime *maat_rt, int update_type,
struct log_handle *logger)
{
for (size_t i = 0; i < maat_rt->max_table_num; i++) {
table_manager_commit_runtime(maat_rt->ref_tbl_mgr, i);
table_manager_commit_runtime(maat_rt->ref_tbl_mgr, i, update_type);
}
maat_rt->last_update_time = time(NULL);
@@ -326,36 +327,31 @@ void maat_start_cb(long long new_version, int update_type, void *u_param)
{
struct maat *maat_instance = (struct maat *)u_param;
maat_instance->new_version = new_version;
size_t i = 0;
size_t table_cnt = table_manager_table_count(maat_instance->tbl_mgr);
enum table_type table_type = TABLE_TYPE_INVALID;
if (update_type == MAAT_UPDATE_TYPE_FULL) {
maat_instance->creating_maat_rt = maat_runtime_create(new_version, maat_instance);
} else {
for (i = 0; i < table_cnt; i++) {
table_type = table_manager_get_table_type(maat_instance->tbl_mgr, i);
if (table_type == TABLE_TYPE_COMPILE) {
// compile runtime need a reference to maat runtime
void *compile_rt = table_manager_get_updating_runtime(maat_instance->tbl_mgr, i);
compile_runtime_init(compile_rt, maat_instance->creating_maat_rt);
}
}
} else {
maat_instance->maat_version = new_version;
}
struct maat_runtime *maat_rt = NULL;
if (maat_instance->creating_maat_rt != NULL) {
maat_rt = maat_instance->creating_maat_rt;
} else {
maat_rt = maat_instance->maat_rt;
}
size_t table_cnt = table_manager_table_count(maat_instance->tbl_mgr);
for (size_t i = 0; i < table_cnt; i++) {
enum table_type table_type = table_manager_get_table_type(maat_instance->tbl_mgr, i);
if (table_type == TABLE_TYPE_COMPILE) {
//compile runtime need a reference to maat runtime
void *compile_rt = table_manager_get_runtime(maat_instance->tbl_mgr, i);
compile_runtime_init(compile_rt, maat_rt);
for (i = 0; i < table_cnt; i++) {
table_type = table_manager_get_table_type(maat_instance->tbl_mgr, i);
if (table_type == TABLE_TYPE_PLUGIN) {
void *schema = table_manager_get_schema(maat_instance->tbl_mgr, i);
plugin_table_all_callback_start((struct plugin_schema *)schema, update_type);
}
if (table_type != TABLE_TYPE_PLUGIN) {
continue;
}
void *schema = table_manager_get_schema(maat_instance->tbl_mgr, i);
plugin_table_all_callback_start((struct plugin_schema *)schema, update_type);
}
}
}
int maat_update_cb(const char *table_name, const char *line, void *u_param)
@@ -382,13 +378,15 @@ int maat_update_cb(const char *table_name, const char *line, void *u_param)
return -1;
}
if (maat_instance->creating_maat_rt != NULL) {
int update_type = MAAT_UPDATE_TYPE_INC;
if (maat_instance->creating_maat_rt != NULL) { //Full update
maat_rt = maat_instance->creating_maat_rt;
update_type = MAAT_UPDATE_TYPE_FULL;
} else {
maat_rt = maat_instance->maat_rt;
}
table_manager_update_runtime(maat_rt->ref_tbl_mgr, table_name, table_id, line);
table_manager_update_runtime(maat_rt->ref_tbl_mgr, table_name, table_id, line, update_type);
return 0;
}
@@ -431,7 +429,7 @@ void maat_finish_cb(void *u_param)
maat_plugin_table_all_callback_finish(maat_instance->tbl_mgr);
if (maat_instance->creating_maat_rt != NULL) {
maat_runtime_commit(maat_instance->creating_maat_rt, maat_instance->logger);
maat_runtime_commit(maat_instance->creating_maat_rt, MAAT_UPDATE_TYPE_FULL, maat_instance->logger);
maat_instance->creating_maat_rt->rule_num = maat_runtime_rule_num(maat_instance->creating_maat_rt);
log_info(maat_instance->logger, MODULE_MAAT_RULE,
"Full config version %llu load %d entries complete",
@@ -439,7 +437,7 @@ void maat_finish_cb(void *u_param)
maat_instance->creating_maat_rt->rule_num);
} else if (maat_instance->maat_rt != NULL) {
maat_instance->maat_rt->version = maat_instance->maat_version;
maat_runtime_commit(maat_instance->maat_rt, maat_instance->logger);
maat_runtime_commit(maat_instance->maat_rt, MAAT_UPDATE_TYPE_INC, maat_instance->logger);
maat_instance->maat_rt->rule_num = maat_runtime_rule_num(maat_instance->maat_rt);
log_info(maat_instance->logger, MODULE_MAAT_RULE,
"Inc config version %llu load %d entries complete",
@@ -559,7 +557,7 @@ void *rule_monitor_loop(void *arg)
time_t time_window = time(NULL) - maat_instance->maat_rt->last_update_time;
if (time_window >= maat_instance->rule_effect_interval_ms / 1000) {
maat_runtime_commit(maat_instance->maat_rt, maat_instance->logger);
maat_runtime_commit(maat_instance->maat_rt, MAAT_UPDATE_TYPE_INC, maat_instance->logger);
log_info(maat_instance->logger, MODULE_MAAT_RULE,
"Actual update config version %u, %d entries load to maat runtime.",
maat_instance->maat_rt->version, maat_instance->maat_rt->rule_num);

View File

@@ -42,6 +42,7 @@ struct maat_table {
int valid_column;
void *schema;
void *runtime;
void *updating_runtime;
};
struct table_manager {
@@ -600,37 +601,40 @@ int table_manager_runtime_create(struct table_manager *tbl_mgr, int max_thread_n
continue;
}
assert(tbl_mgr->tbl[i]->updating_runtime == NULL);
table_type = table_manager_get_table_type(tbl_mgr, i);
void *runtime = table_manager_get_runtime(tbl_mgr, i);
tbl_mgr->tbl[i]->runtime = maat_table_runtime_new(schema, table_type, max_thread_num,
garbage_bin, tbl_mgr->logger);
if (runtime != NULL) {
enum table_type *arg = ALLOC(enum table_type, 1);
*arg = table_type;
maat_garbage_bagging(garbage_bin, runtime, arg, garbage_maat_table_runtime_free);
}
assert(table_type != TABLE_TYPE_INVALID);
tbl_mgr->tbl[i]->updating_runtime = maat_table_runtime_new(schema, table_type, max_thread_num,
garbage_bin, tbl_mgr->logger);
}
/* group2compile runtime depends on associated compile runtime,
must make sure associated compile runtime already exist */
for (i = 0; i < MAX_TABLE_NUM; i++) {
void *runtime = table_manager_get_runtime(tbl_mgr, i);
if (NULL == runtime) {
continue;
}
table_type = table_manager_get_table_type(tbl_mgr, i);
if (table_type != TABLE_TYPE_GROUP2COMPILE) {
continue;
}
void *schema = table_manager_get_schema(tbl_mgr, i);
if (NULL == schema) {
log_error(tbl_mgr->logger, MODULE_TABLE,
"[%s:%d] group2compile table(table_id:%d) schema is null",
__FUNCTION__, __LINE__, i);
continue;
}
void *g2c_updating_rt = table_manager_get_updating_runtime(tbl_mgr, i);
if (NULL == g2c_updating_rt) {
continue;
}
int associated_compile_table_id = group2compile_associated_compile_table_id(schema);
void *compile_rt = table_manager_get_runtime(tbl_mgr, associated_compile_table_id);
void *compile_updating_rt = table_manager_get_updating_runtime(tbl_mgr, associated_compile_table_id);
int g2g_group_id = table_manager_get_group2group_table_id(tbl_mgr);
void *g2g_rt = table_manager_get_runtime(tbl_mgr, g2g_group_id);
group2compile_runtime_init(runtime, compile_rt, g2g_rt);
void *g2g_updating_rt = table_manager_get_updating_runtime(tbl_mgr, g2g_group_id);
group2compile_runtime_init(g2c_updating_rt, compile_updating_rt, g2g_updating_rt);
}
return 0;
@@ -788,15 +792,34 @@ void *table_manager_get_runtime(struct table_manager *tbl_mgr, int table_id)
return tbl_mgr->tbl[table_id]->runtime;
}
void *table_manager_get_updating_runtime(struct table_manager *tbl_mgr, int table_id)
{
if (NULL == tbl_mgr || (table_id < 0) || (table_id >= MAX_TABLE_NUM)) {
return NULL;
}
if (NULL == tbl_mgr->tbl[table_id]) {
return NULL;
}
return tbl_mgr->tbl[table_id]->updating_runtime;
}
int table_manager_update_runtime(struct table_manager *tbl_mgr, const char *table_name,
int table_id, const char *line)
int table_id, const char *line, int update_type)
{
void *schema = table_manager_get_schema(tbl_mgr, table_id);
if (NULL == schema) {
return -1;
}
void *runtime = table_manager_get_runtime(tbl_mgr, table_id);
void *runtime = NULL;
if (update_type == MAAT_UPDATE_TYPE_FULL) {
runtime = table_manager_get_updating_runtime(tbl_mgr, table_id);
} else {
runtime = table_manager_get_runtime(tbl_mgr, table_id);
}
if (NULL == runtime) {
return -1;
}
@@ -828,10 +851,10 @@ int table_manager_update_runtime(struct table_manager *tbl_mgr, const char *tabl
line, valid_column);
}
void table_manager_commit_runtime(struct table_manager *tbl_mgr, int table_id)
void table_commit_updating_runtime(struct table_manager *tbl_mgr, int table_id)
{
void *runtime = table_manager_get_runtime(tbl_mgr, table_id);
if (NULL == runtime) {
void *updating_rt = table_manager_get_updating_runtime(tbl_mgr, table_id);
if (NULL == updating_rt) {
return;
}
@@ -845,7 +868,52 @@ void table_manager_commit_runtime(struct table_manager *tbl_mgr, int table_id)
struct maat_table *ptable = tbl_mgr->tbl[table_id];
if ( table_ops[table_type].commit_runtime != NULL) {
table_ops[table_type].commit_runtime(runtime, ptable->table_name);;
table_ops[table_type].commit_runtime(updating_rt, ptable->table_name);
}
void *runtime = table_manager_get_runtime(tbl_mgr, table_id);
tbl_mgr->tbl[table_id]->runtime = updating_rt;
if (runtime != NULL) {
enum table_type *arg = ALLOC(enum table_type, 1);
*arg = table_type;
maat_garbage_bagging(tbl_mgr->ref_garbage_bin, runtime, arg, garbage_maat_table_runtime_free);
}
tbl_mgr->tbl[table_id]->updating_runtime = NULL;
}
void table_commit_runtime(struct table_manager *tbl_mgr, int table_id)
{
void *runtime = table_manager_get_runtime(tbl_mgr, table_id);
if (NULL == runtime) {
return;
}
enum table_type table_type = table_manager_get_table_type(tbl_mgr, table_id);
if (table_type == TABLE_TYPE_INVALID) {
log_error(tbl_mgr->logger, MODULE_TABLE,
"[%s:%d] table(table_id:%d) table_type is invalid, can't commit runtime",
__FUNCTION__, __LINE__, table_id);
return;
}
struct maat_table *ptable = tbl_mgr->tbl[table_id];
if (table_ops[table_type].commit_runtime != NULL) {
table_ops[table_type].commit_runtime(runtime, ptable->table_name);
}
}
void table_manager_commit_runtime(struct table_manager *tbl_mgr, int table_id,
int update_type)
{
if (NULL == tbl_mgr || table_id < 0) {
return;
}
if (update_type == MAAT_UPDATE_TYPE_FULL) {
table_commit_updating_runtime(tbl_mgr, table_id);
} else {
table_commit_runtime(tbl_mgr, table_id);
}
}