framework work well

This commit is contained in:
liuwentan
2022-11-25 16:32:29 +08:00
parent 2a83517894
commit 7e6d131c9e
51 changed files with 3499 additions and 3139 deletions

View File

@@ -9,92 +9,179 @@
*/
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>
#include <limits.h>
#include <string.h>
#include <sys/prctl.h>
#include <assert.h>
#include "maat_rule.h"
#include "maat_config_monitor.h"
#include "utils.h"
#include "maat_utils.h"
#include "maat_table_runtime.h"
#include "maat_table_schema.h"
struct maat_runtime* create_maat_runtime(uint64_t version, struct maat *maat_instance)
struct maat_runtime* maat_runtime_create(long long version, struct maat *maat_instance)
{
struct maat_runtime *maat_rt = ALLOC(struct maat_runtime, 1);
maat_rt->version = version;
maat_rt->table_rt_mgr = maat_table_runtime_manager_create(maat_instance->table_mgr);
maat_rt->max_table_num = maat_table_manager_get_size(maat_instance->table_mgr);
maat_rt->table_rt_mgr = table_runtime_manager_create(maat_instance->table_schema_mgr,
maat_instance->nr_worker_thread,
maat_instance->garbage_bin);
maat_rt->max_table_num = table_schema_manager_get_size(maat_instance->table_schema_mgr);
maat_rt->max_thread_num = maat_instance->nr_worker_thread;
return maat_rt;
}
void update_maat_runtime(struct maat_runtime *maat_rt)
void maat_runtime_commit(struct maat_runtime *maat_rt)
{
for (size_t i = 0; i < maat_rt->max_table_num; i++) {
struct maat_table_runtime *table_rt = maat_table_runtime_get(maat_rt->table_rt_mgr, i);
struct table_runtime *table_rt = table_runtime_get(maat_rt->table_rt_mgr, i);
if (NULL == table_rt) {
continue;
}
enum maat_table_type table_type = maat_table_runtime_get_type(table_rt);
table_runtime_commit(table_rt, maat_rt->max_thread_num);
}
maat_rt->last_update_time = time(NULL);
}
void destroy_maat_runtime(struct maat_runtime *maat_rt)
void maat_runtime_destroy(struct maat_runtime *maat_rt)
{
if (NULL == maat_rt) {
return;
}
if (maat_rt->table_rt_mgr != NULL) {
table_runtime_manager_destroy(maat_rt->table_rt_mgr);
maat_rt->table_rt_mgr = NULL;
}
free(maat_rt);
}
void maat_start_cb(uint64_t new_version, int update_type, void *u_param)
int maat_runtime_updating_flag(struct maat_runtime *maat_rt)
{
for (size_t i = 0; i < maat_rt->max_table_num; i++) {
struct table_runtime *table_rt = table_runtime_get(maat_rt->table_rt_mgr, i);
if (NULL == table_rt) {
continue;
}
int flag = table_runtime_updating_flag(table_rt);
if (1 == flag) {
return 1;
}
}
return 0;
}
void maat_start_cb(long long new_version, int update_type, void *u_param)
{
struct maat *maat_instance = (struct maat *)u_param;
if (update_type == CONFIG_UPDATE_TYPE_FULL) {
maat_instance->rebuilding_maat_rt = create_maat_runtime(new_version, maat_instance);
if (update_type == CM_UPDATE_TYPE_FULL) {
maat_instance->creating_maat_rt = maat_runtime_create(new_version, maat_instance);
} else {
maat_instance->maat_version = new_version;
}
table_schema_manager_all_plugin_cb_start(maat_instance->table_schema_mgr, update_type);
}
int maat_update_cb(const char* table_name, const char* line, void *u_param)
int maat_update_cb(const char *table_name, const char *line, void *u_param)
{
struct maat *maat_instance =(struct maat *)u_param;
struct maat_runtime* maat_rt = NULL;
int table_id = maat_table_manager_get_table_id(maat_instance->table_mgr, table_name);
struct maat_table_schema* ptable = maat_table_manager_get_table_desc(maat_instance->table_mgr, table_id);
int table_id = table_schema_manager_get_table_id(maat_instance->table_schema_mgr, table_name);
struct table_schema* table_schema = table_schema_get(maat_instance->table_schema_mgr, table_id);
if (maat_instance->rebuilding_maat_rt != NULL) {
maat_rt = maat_instance->rebuilding_maat_rt;
if (maat_instance->creating_maat_rt != NULL) {
maat_rt = maat_instance->creating_maat_rt;
} else {
maat_rt = maat_instance->maat_rt;
}
//TODO: update rule for table_schema
update_maat_runtime();
struct table_item *table_item = table_schema_line_to_item(line, table_schema);
struct table_runtime *table_rt = table_runtime_get(maat_rt->table_rt_mgr, table_id);
table_runtime_update(table_rt, table_schema, line, table_item);
free(table_item);
return 0;
}
uint32_t maat_runtime_rule_num(struct maat_runtime *maat_rt)
{
uint32_t total = 0;
struct table_runtime *table_rt = NULL;
for (size_t i = 0; i < maat_rt->max_table_num; i++) {
table_rt = table_runtime_get(maat_rt->table_rt_mgr, i);
if (table_rt != NULL) {
total += table_runtime_rule_count(table_rt);
}
}
return total;
}
void maat_finish_cb(void *u_param)
{
struct maat *maat_instance = (struct maat *)u_param;
if (maat_instance->latest_maat_rt != NULL) {
update_maat_runtime(maat_instance->latest_maat_rt);
table_schema_manager_all_plugin_cb_finish(maat_instance->table_schema_mgr);
if (maat_instance->creating_maat_rt != NULL) {
maat_instance->creating_maat_rt->rule_num = maat_runtime_rule_num(maat_instance->creating_maat_rt);
maat_runtime_commit(maat_instance->creating_maat_rt);
fprintf(stdout, "Full config version %llu load %d entries complete",
maat_instance->creating_maat_rt->version,
maat_instance->creating_maat_rt->rule_num);
} else if (maat_instance->maat_rt != NULL) {
maat_instance->maat_rt->rule_num = maat_runtime_rule_num(maat_instance->maat_rt);
maat_instance->maat_rt->version = maat_instance->maat_version;
maat_runtime_commit(maat_instance->maat_rt);
fprintf(stdout, "Inc config version %llu load %d entries complete",
maat_instance->maat_rt->version,
maat_instance->maat_rt->rule_num);
}
//
}
void *rule_monitor_loop(void *arg)
{
char err_str[NAME_MAX] = {0};
/* Defined by prctl: The name can be up to 16 bytes long, and should
be null terminated if it contains fewer bytes. */
char maat_name[16] = {0};
struct maat *maat_instance = (struct maat *)arg;
if (strlen(maat_instance->instance_name) > 0) {
snprintf(maat_name, sizeof(maat_name), "MAAT_%s", maat_instance->instance_name);
} else {
snprintf(maat_name, sizeof(maat_name), "MAAT");
}
int ret = prctl(PR_SET_NAME, (unsigned long long)maat_name, NULL, NULL, NULL);
assert(ret >= 0);
pthread_mutex_lock(&(maat_instance->background_update_mutex));
/* if deferred load on */
if (maat_instance->deferred_load != 0) {
fprintf(stdout, "Deferred Loading ON, updating in %s.", __func__);
maat_read_full_config(maat_instance);
}
pthread_mutex_unlock(&(maat_instance->background_update_mutex));
while (maat_instance->is_running) {
usleep(1000 * 1000);
usleep(maat_instance->rule_update_checking_interval_ms * 1000);
if( 0 == pthread_mutex_trylock(&(maat_instance->background_update_mutex))) {
switch (maat_instance->rule_import_type) {
case RULE_IMPORT_TYPE_IRIS:
switch (maat_instance->input_mode) {
case DATA_SOURCE_IRIS_FILE:
config_monitor_traverse(maat_instance->maat_version,
maat_instance->iris_ctx.inc_dir,
maat_start_cb,
@@ -106,25 +193,38 @@ void *rule_monitor_loop(void *arg)
break;
}
if (maat_instance->latest_maat_rt != NULL) {
if (maat_instance->creating_maat_rt != NULL) {
struct maat_runtime *old_maat_rt = maat_instance->maat_rt;
maat_instance->maat_rt = maat_instance->latest_maat_rt;
maat_instance->maat_rt = maat_instance->creating_maat_rt;
if (old_maat_rt != NULL) {
maat_garbage_bagging(maat_instance->garbage_bin, old_maat_rt, (void (*)(void*))maat_runtime_destroy);
}
maat_instance->latest_maat_rt = NULL;
maat_instance->creating_maat_rt = NULL;
maat_instance->maat_version = maat_instance->maat_rt->version;
maat_instance->last_full_version = maat_instance->maat_rt->version;
}
if (maat_instance->maat_rt != NULL) {
update_maat_runtime(maat_instance->maat_rt);
int updating_flag = maat_runtime_updating_flag(maat_instance->maat_rt);
time_t time_window = time(NULL) - maat_instance->maat_rt->last_update_time;
if ((updating_flag > 0) && (time_window >= maat_instance->rule_effect_interval_ms / 1000)) {
maat_runtime_commit(maat_instance->maat_rt);
}
}
pthread_mutex_unlock(&(maat_instance->background_update_mutex));
}
maat_garbage_collect_routine(maat_instance->garbage_bin);
}
maat_runtime_destroy(maat_instance->maat_rt);
maat_garbage_bin_free(maat_instance->garbage_bin);
table_schema_manager_destroy(maat_instance->table_schema_mgr);
free(maat_instance);
return NULL;
}