hierarchy refactor unfinished

This commit is contained in:
liuwentan
2023-01-06 18:54:59 +08:00
parent 9778267b48
commit 3d4b833e48
18 changed files with 2314 additions and 848 deletions

View File

@@ -33,8 +33,7 @@
#define DISTRICT_ANY -1
#define DISTRICT_UNKNOWN -2
struct maat_state
{
struct maat_state {
struct maat *maat_instance;
int16_t thread_id;
unsigned char is_set_district;
@@ -44,6 +43,26 @@ struct maat_state
struct maat_hierarchy_compile_mid *compile_mid;
};
struct scan_item_hit_wrapper {
int Nth_scan;
struct maat_item_inner* wrapped_items[MAX_SCANNER_HIT_NUM];
size_t n_wrapped_item;
int *virtual_table_ids;
int virtual_table_id;
int is_last_item;
};
inline int scan_state_should_compile_NOT(struct maat_state *mid)
{
if (mid && mid->is_last_scan==1 && mid->compile_mid &&
maat_hierarchy_compile_mid_has_NOT_clause(mid->compile_mid)) {
return 1;
} else {
return 0;
}
}
struct maat_options* maat_options_new(void)
{
struct maat_options *options = ALLOC(struct maat_options, 1);
@@ -59,9 +78,9 @@ struct maat_options* maat_options_new(void)
return options;
}
int maat_options_set_worker_thread_number(struct maat_options *opts, size_t n_worker_threads)
int maat_options_set_caller_thread_number(struct maat_options *opts, size_t n_thread)
{
opts->nr_worker_threads = n_worker_threads;
opts->nr_worker_threads = n_thread;
return 0;
}
@@ -101,25 +120,14 @@ int maat_options_set_deferred_load_on(struct maat_options *opts)
return 0;
}
int maat_options_set_iris_full_index_dir(struct maat_options *opts, const char *full_idx_dir)
int maat_options_set_iris(struct maat_options *opts, const char *full_directory, const char *increment_directory)
{
if (strlen(full_idx_dir) >= NAME_MAX) {
if (strlen(full_directory) >= NAME_MAX || strlen(increment_directory) >= NAME_MAX) {
return -1;
}
memcpy(opts->iris_ctx.full_idx_dir, full_idx_dir, strlen(full_idx_dir));
opts->input_mode = DATA_SOURCE_IRIS_FILE;
return 0;
}
int maat_options_set_iris_inc_index_dir(struct maat_options *opts, const char *inc_idx_dir)
{
if (strlen(inc_idx_dir) >= NAME_MAX) {
return -1;
}
memcpy(opts->iris_ctx.inc_idx_dir, inc_idx_dir, strlen(inc_idx_dir));
memcpy(opts->iris_ctx.full_idx_dir, full_directory, strlen(full_directory));
memcpy(opts->iris_ctx.inc_idx_dir, increment_directory, strlen(increment_directory));
opts->input_mode = DATA_SOURCE_IRIS_FILE;
return 0;
@@ -133,24 +141,12 @@ int maat_options_set_json_file(struct maat_options *opts, const char *json_filen
return 0;
}
int maat_options_set_redis_ip(struct maat_options *opts, const char *redis_ip)
int maat_options_set_redis(struct maat_options *opts, const char *redis_ip, uint16_t redis_port, int redis_db)
{
memcpy(opts->redis_ctx.redis_ip, redis_ip, strlen(redis_ip));
opts->input_mode = DATA_SOURCE_REDIS;
return 0;
}
int maat_options_set_redis_port(struct maat_options *opts, uint16_t redis_port)
{
opts->redis_ctx.redis_port = redis_port;
return 0;
}
int maat_options_set_redis_db(struct maat_options *opts, int db_index)
{
opts->redis_ctx.redis_db = db_index;
opts->redis_ctx.redis_db = redis_db;
opts->input_mode = DATA_SOURCE_REDIS;
return 0;
}
@@ -254,6 +250,9 @@ struct maat *maat_new(struct maat_options *opts, const char *table_info_path)
goto failed;
}
if (opts->compile_tablename != NULL) {
memcpy(maat_instance->compile_tablename, opts->compile_tablename, strlen(opts->compile_tablename));
}
maat_instance->input_mode = opts->input_mode;
switch (maat_instance->input_mode) {
case DATA_SOURCE_REDIS:
@@ -334,16 +333,6 @@ inline void maat_runtime_ref_dec(struct maat_runtime *maat_rt, int thread_id)
alignment_int64_array_add(maat_rt->ref_cnt, thread_id, -1);
}
inline int scan_state_should_compile_NOT(struct maat_state *mid)
{
if (mid && (1 == mid->is_last_scan) && mid->compile_mid &&
maat_hierarchy_compile_mid_has_NOT_clause(mid->compile_mid)) {
return 1;
} else {
return 0;
}
}
int maat_table_callback_register(struct maat *maat_instance, int table_id,
maat_start_callback_t *start,
maat_update_callback_t *update,
@@ -410,15 +399,14 @@ int maat_plugin_table_ex_schema_register(struct maat *maat_instance, int table_i
if (maat_instance->maat_rt != NULL) {
table_rt = table_runtime_get(maat_instance->maat_rt->table_rt_mgr, table_id);
table_runtime_commit_ex_data_schema(table_rt, table_schema, maat_instance->nr_worker_thread,
maat_instance->logger);
maat_instance->maat_rt->version, maat_instance->logger);
}
pthread_mutex_unlock(&(maat_instance->background_update_mutex));
return 0;
}
void *maat_plugin_table_dup_ex_data(struct maat *maat_instance, int table_id,
const char *key, size_t key_len)
void *maat_plugin_table_get_ex_data(struct maat *maat_instance, int table_id, const char *key, size_t key_len)
{
struct maat_runtime *maat_rt = maat_instance->maat_rt;
if (NULL == maat_rt) {
@@ -460,7 +448,7 @@ struct maat_state *make_outer_state(struct maat *maat_instance, int thread_id)
return outer_state;
}
struct maat_state *grab_mid(struct maat_state **state, struct maat *maat_instance, int thread_id, int is_hit_region)
struct maat_state *grab_mid(struct maat_state **state, struct maat *maat_instance, int thread_id, int is_hit_item)
{
struct maat_state *mid = *state;
@@ -478,7 +466,7 @@ struct maat_state *grab_mid(struct maat_state **state, struct maat *maat_instanc
alignment_int64_array_add(maat_instance->outer_mid_cnt, thread_id, 1);
}
if (is_hit_region == 1) {
if (is_hit_item == 1) {
if (NULL == mid->compile_mid) {
mid->compile_mid = maat_hierarchy_compile_mid_new(maat_instance->maat_rt->hier, thread_id);
alignment_int64_array_add(maat_instance->compile_mid_cnt, thread_id, 1);
@@ -487,6 +475,140 @@ struct maat_state *grab_mid(struct maat_state **state, struct maat *maat_instanc
return mid;
}
void scan_item_hit_wrapper_build(struct scan_item_hit_wrapper* wrapper, struct scan_result* results, size_t n_result,
int district_id, int is_last_item, int virtual_table_id, int Nth_scan)
{
size_t i=0;
struct maat_item_inner *item = NULL;
wrapper->n_wrapped_item = 0;
wrapper->virtual_table_id = 0;
wrapper->virtual_table_ids = NULL;
for (i = 0; i < n_result; i++) {
item = (struct maat_item_inner *)(results[i].tag);
if (item->district_id == district_id || district_id == DISTRICT_ANY) {
wrapper->wrapped_items[wrapper->n_wrapped_item] = item;
wrapper->n_wrapped_item++;
}
}
wrapper->is_last_item = is_last_item;
wrapper->virtual_table_id = virtual_table_id;
wrapper->Nth_scan = Nth_scan;
wrapper->virtual_table_ids = NULL;
}
struct compile_sort_para {
double evaluation_order;
int declared_clause_num;
int compile_id;
void *user;
};
static void compile_sort_para_set(struct compile_sort_para *para,
const struct maat_compile_rule *compile_relation, void *user)
{
para->compile_id=compile_relation->compile_id;
para->evaluation_order=compile_relation->evaluation_order;
para->declared_clause_num=compile_relation->declared_clause_num;
para->user = user;
}
static int compile_sort_para_compare(const struct compile_sort_para *a, const struct compile_sort_para *b)
{
//If both of compile rule's evaluation order are specified, compile rule with small evaluation order is priority.
if(a->evaluation_order!=0 && b->evaluation_order!=0)
{
if(a->evaluation_order - b->evaluation_order <0)
{
return -1;
}
else if(a->evaluation_order - b->evaluation_order >0)
{
return 1;
}
}
//If one of compile rule's evaluation order is zero, compile rule with big evaluation order is priority.
else if(a->evaluation_order + b->evaluation_order!= 0)
{
return (a->evaluation_order - b->evaluation_order >0) ? -1 : 1;
}
//If compile rule's execute sequences are not specified or equal.
if(a->declared_clause_num!=b->declared_clause_num)
{
return (a->declared_clause_num-b->declared_clause_num);
}
else
{
return (b->compile_id-a->compile_id);
}
}
static int compare_compile_rule(const void *a, const void *b)
{
const struct maat_compile_rule *ra=*(const struct Maat_compile_rule **)a;
const struct maat_compile_rule *rb=*(const struct Maat_compile_rule **)b;
struct compile_sort_para sa, sb;
compile_sort_para_set(&sa, ra, NULL);
compile_sort_para_set(&sb, rb, NULL);
return compile_sort_para_compare(&sa, &sb);
}
int item_compile(struct maat *maat_instance, struct maat_hierarchy_compile_mid *compile_mid,
const struct scan_item_hit_wrapper *item_hit_wrapper, int *result,
int size, int thread_id)
{
int is_last_item = item_hit_wrapper->is_last_item;
size_t item_hit_num = item_hit_wrapper->n_wrapped_item;
int scan_ret=0;
int i=0;
struct maat_compile_rule *compile_rule_array[size];
struct maat_compile_rule *compile_rule = NULL;
int virtual_table_id = 0;
struct maat_item_inner *item = NULL;
for (i = 0; (size_t)i < item_hit_num;i++) {
item = item_hit_wrapper->wrapped_items[i];
assert(item->magic_num == ITEM_RULE_MAGIC);
if (item_hit_wrapper->virtual_table_ids) {
virtual_table_id = item_hit_wrapper->virtual_table_ids[i];
} else {
virtual_table_id = item_hit_wrapper->virtual_table_id;
}
maat_hierarchy_compile_mid_update(feather->scanner->hier, compile_mid, item->item_id, virtual_table_id, item_hit_wrapper->Nth_scan, i);
}
scan_ret = maat_hierarchy_item_compile(feather->scanner->hier, compile_mid, is_last_item, (void **)compile_rule_array, size);
//Maat_hierarchy is rwlock protected, it always returns non-NULL compile_rule.
if (scan_ret > 1) {
qsort(compile_rule_array, scan_ret, sizeof(struct maat_compile_rule *),
compare_compile_rule);
}
for (i = 0; i < scan_ret && i < size; i++) {
compile_rule = compile_rule_array[i];
assert(compile_rule->magic_num == COMPILE_RULE_MAGIC);
result[i] = compile_rule->compile_id;
/*
fill_maat_rule(&(result[i]), &(compile_rule->head), compile_rule->service_defined,
compile_rule->head.serv_def_len);*/
}
if (scan_ret > 0) {
alignment_int64_array_add(feather->hit_cnt, thread_id, 1);
}
if (item_hit_num == 0 && scan_ret > 0) {
alignment_int64_array_add(feather->not_grp_hit_cnt, thread_id, 1);
}
return MIN(scan_ret, size);
}
int maat_scan_integer(struct maat *instance, int table_id, int thread_id,
unsigned int intval, int results[], size_t *n_result,
struct maat_state **state)
@@ -494,19 +616,17 @@ int maat_scan_integer(struct maat *instance, int table_id, int thread_id,
return 0;
}
static int ip_scan_data_set(struct table_rt_2tuple *table_rt_addr, struct addr_4tuple *addr, enum component_table_type child_type)
static int ip_scan_data_set(struct ip_addr *scan_data, struct addr_2tuple *addr, enum component_table_type child_type)
{
switch (addr->type) {
case IP_TYPE_V4:
table_rt_addr->ip_type = IP_TYPE_V4;
scan_data->ip_type = IP_TYPE_V4;
switch (child_type) {
case COMPONENT_TABLE_TYPE_SIP:
table_rt_addr->ipv4 = ntohl(addr->ipv4.sip);
table_rt_addr->port = ntohs(addr->ipv4.sport);
scan_data->ipv4 = ntohl(addr->ipv4.sip);
break;
case COMPONENT_TABLE_TYPE_DIP:
table_rt_addr->ipv4 = ntohl(addr->ipv4.dip);
table_rt_addr->port = ntohs(addr->ipv4.dport);
scan_data->ipv4 = ntohl(addr->ipv4.dip);
break;
default:
assert(0);
@@ -514,17 +634,15 @@ static int ip_scan_data_set(struct table_rt_2tuple *table_rt_addr, struct addr_4
}
break;
case IP_TYPE_V6:
table_rt_addr->ip_type = IP_TYPE_V6;
scan_data->ip_type = IP_TYPE_V6;
switch (child_type) {
case COMPONENT_TABLE_TYPE_SIP:
memcpy(table_rt_addr->ipv6, addr->ipv6.sip, sizeof(addr->ipv6.sip));
ipv6_ntoh(table_rt_addr->ipv6);
table_rt_addr->port = ntohs(addr->ipv6.sport);
memcpy(scan_data->ipv6, addr->ipv6.sip, sizeof(addr->ipv6.sip));
ipv6_ntoh(scan_data->ipv6);
break;
case COMPONENT_TABLE_TYPE_DIP:
memcpy(table_rt_addr->ipv6, addr->ipv6.dip, sizeof(addr->ipv6.dip));
ipv6_ntoh(table_rt_addr->ipv6);
table_rt_addr->port = ntohs(addr->ipv6.dport);
memcpy(scan_data->ipv6, addr->ipv6.dip, sizeof(addr->ipv6.dip));
ipv6_ntoh(scan_data->ipv6);
break;
default:
assert(0);
@@ -539,12 +657,12 @@ static int ip_scan_data_set(struct table_rt_2tuple *table_rt_addr, struct addr_4
return 0;
}
static int ip_composition_scan(int thread_id, struct addr_4tuple *addr,
static int ip_composition_scan(int thread_id, struct addr_2tuple *addr,
int parent_table_id, enum component_table_type child_type,
int *virtual_table_id,
struct table_schema_manager *table_schema_mgr,
struct table_runtime_manager *table_rt_mgr,
struct scan_result *region_results, size_t n_result_array)
struct scan_result *item_results, size_t n_result_array)
{
int child_table_id = 0;
@@ -576,12 +694,12 @@ static int ip_composition_scan(int thread_id, struct addr_4tuple *addr,
return 0;
}
struct table_rt_2tuple scan_data;
memset(&scan_data, 0, sizeof(struct table_rt_2tuple));
struct ip_addr scan_data;
memset(&scan_data, 0, sizeof(struct ip_addr));
ip_scan_data_set(&scan_data, addr, child_type);
size_t hit_cnt = 0;
int ret = table_runtime_scan_ip(table_rt, thread_id, &scan_data, region_results, &hit_cnt, n_result_array);
int ret = table_runtime_scan_ip(table_rt, thread_id, &scan_data, item_results, &hit_cnt, n_result_array);
if (ret < 0) {
return -1;
}
@@ -589,7 +707,7 @@ static int ip_composition_scan(int thread_id, struct addr_4tuple *addr,
return hit_cnt;
}
int maat_scan_ip(struct maat *maat_instance, int table_id, int thread_id, struct addr_4tuple *addr,
int maat_scan_ip(struct maat *maat_instance, int table_id, int thread_id, struct addr_2tuple *addr,
int results[], size_t *n_result, struct maat_state **state)
{
if ((NULL == maat_instance) || (table_id < 0) || (table_id >= MAX_TABLE_NUM) ||
@@ -615,49 +733,70 @@ int maat_scan_ip(struct maat *maat_instance, int table_id, int thread_id, struct
return -1;
}
int region_ret = 0;
int item_ret = 0;
int virtual_table_id = 0;
struct scan_result *region_result = maat_rt->region_result_buff + thread_id * MAX_SCANNER_HIT_NUM;
int region_hit_cnt = 0;
int region_result_virtual_table_ids[MAX_SCANNER_HIT_NUM];
struct scan_result *item_result = maat_rt->item_result_buff + thread_id * MAX_SCANNER_HIT_NUM;
int item_hit_cnt = 0;
int item_result_virtual_table_ids[MAX_SCANNER_HIT_NUM];
alignment_int64_array_add(maat_instance->thread_call_cnt, thread_id, 1);
maat_runtime_ref_inc(maat_rt, thread_id);
if (table_type == TABLE_TYPE_COMPOSITION) {
region_ret = ip_composition_scan(thread_id, addr, table_id, COMPONENT_TABLE_TYPE_SIP, &virtual_table_id,
/*
item_ret = ip_composition_scan(thread_id, addr, table_id, COMPONENT_TABLE_TYPE_SIP, &virtual_table_id,
maat_instance->table_schema_mgr, maat_instance->maat_rt->table_rt_mgr,
region_result + region_hit_cnt, MAX_SCANNER_HIT_NUM - region_hit_cnt);
region_hit_cnt += region_ret;
/*enum component_table_type childs[3] = {COMPONENT_TABLE_TYPE_SIP, COMPONENT_TABLE_TYPE_DIP, COMPONENT_TABLE_TYPE_SESSION};
item_result + item_hit_cnt, MAX_SCANNER_HIT_NUM - item_hit_cnt);
item_hit_cnt += item_ret; */
enum component_table_type childs[3] = {COMPONENT_TABLE_TYPE_SIP, COMPONENT_TABLE_TYPE_DIP, COMPONENT_TABLE_TYPE_SESSION};
for (int i = 0; i < 3; i++) {
region_ret = ip_composition_scan(thread_id, addr, table_id, childs[i], &virtual_table_id,
item_ret = ip_composition_scan(thread_id, addr, table_id, childs[i], &virtual_table_id,
maat_instance->table_schema_mgr, maat_instance->maat_rt->table_rt_mgr,
region_result + region_hit_cnt, MAX_SCANNER_HIT_NUM - region_hit_cnt);
if (region_ret < 0) {
item_result + item_hit_cnt, MAX_SCANNER_HIT_NUM - item_hit_cnt);
if (item_ret < 0) {
maat_instance->scan_err_cnt++;
} else {
for (int j = 0; j < region_ret; j++) {
region_result_virtual_table_ids[region_hit_cnt++] = virtual_table_id;
for (int j = 0; j < item_ret; j++) {
item_result_virtual_table_ids[item_hit_cnt++] = virtual_table_id;
}
}
}*/
}
} else {
region_ret = ip_composition_scan(thread_id, addr, table_id, COMPONENT_TABLE_TYPE_NONE, &virtual_table_id,
item_ret = ip_composition_scan(thread_id, addr, table_id, COMPONENT_TABLE_TYPE_NONE, &virtual_table_id,
maat_instance->table_schema_mgr, maat_instance->maat_rt->table_rt_mgr,
region_result + region_hit_cnt, MAX_SCANNER_HIT_NUM - region_hit_cnt);
if (region_ret < 0) {
item_result + item_hit_cnt, MAX_SCANNER_HIT_NUM - item_hit_cnt);
if (item_ret < 0) {
maat_instance->scan_err_cnt++;
} else {
region_hit_cnt += region_ret;
item_hit_cnt += item_ret;
}
}
*n_result = region_ret;
for (int i = 0; i < region_ret; i++) {
results[i] = region_result[i].rule_id;
}
int compile_ret = 0;
struct scan_item_hit_wrapper item_hit_wrapper;
if (item_hit_cnt > 0 || scan_state_should_compile_NOT(mid)) {
mid = grab_mid(state, maat_instance, thread_id, 1);
scan_item_hit_wrapper_build(&item_hit_wrapper, item_result, item_hit_cnt, -1,
mid->is_last_scan, virtual_table_id, mid->scan_cnt);
if (table_type == TABLE_TYPE_COMPOSITION) {
item_hit_wrapper.virtual_table_ids = item_result_virtual_table_ids;
}
return 0;
compile_ret = item_compile(maat_instance, mid->compile_mid,
&item_hit_wrapper,
results, item_hit_cnt,
thread_id);
assert(mid->is_last_scan < 2);
if (mid->is_last_scan == 1) {
mid->is_last_scan = 2;
}
}
maat_runtime_ref_dec(maat_rt, thread_id);
if (compile_ret == 0 && item_hit_cnt > 0) {
return -2;
}
*n_result = compile_ret;
return compile_ret;
}
int maat_scan_string(struct maat *maat_instance, int table_id, int thread_id,
@@ -705,7 +844,12 @@ int maat_state_get(struct maat *instance, struct maat_state **mid, enum maat_sca
}
void maat_state_reset(struct maat_state **state)
void maat_state_free(struct maat_state **state)
{
}
int maat_matched_compile_id(struct maat *instance, struct maat_matched *matched)
{
}