重构Maat table相关代码。

This commit is contained in:
zhengchao
2019-07-25 14:49:11 +06:00
parent c189b90e6d
commit 2909cb1997
10 changed files with 562 additions and 439 deletions

View File

@@ -34,8 +34,6 @@
int MAAT_FRAME_VERSION_2_7_20190629=1;
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",
"unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""};
int is_valid_expr_type(enum MAAT_EXPR_TYPE expr_type)
{
@@ -67,8 +65,8 @@ int is_valid_match_method(enum MAAT_MATCH_METHOD match_method)
iconv_t maat_iconv_open(struct Maat_scanner* scanner,enum MAAT_CHARSET to,enum MAAT_CHARSET from)
{
const char *from_s=CHARSET_STRING[from];
const char *to_s=CHARSET_STRING[to];
const char *from_s=charset_get_name(from);
const char *to_s=charset_get_name(to);
iconv_t cd;
if(from==CHARSET_GBK&&to==CHARSET_BIG5)
{
@@ -523,323 +521,6 @@ void rule_ex_data_free(const struct Maat_rule_head * rule_head, const char* srv_
}
int read_expr_table_info(const char* line, struct Maat_table_desc* table, MESA_htable_handle string2int_map)
{
int j=0,ret[4]={0};
char table_type[16],src_charset[256],dst_charset[256],merge[4],quick_str_scan[32]={0};
char *token=NULL,*sub_token=NULL,*saveptr;
struct expr_table_desc* p=&(table->expr);
sscanf(line,"%d\t%s\t%s\t%s\t%s\t%s\t%d\t%s",&(table->table_id)
,table->table_name[0]
,table_type
,src_charset
,dst_charset
,merge
,&(p->cross_cache_size)
,quick_str_scan);
memset(ret,0,sizeof(ret));
ret[0]=map_str2int(string2int_map,str_tolower(table_type),(int*)&(table->table_type));
ret[1]=map_str2int(string2int_map,str_tolower(src_charset),(int*)&(p->src_charset));
ret[2]=map_str2int(string2int_map,str_tolower(merge),&(p->do_charset_merge));
if(strlen(quick_str_scan)>0)
{
ret[3]=map_str2int(string2int_map,str_tolower(quick_str_scan),&(p->quick_expr_switch));
}
memset(quick_str_scan,0,sizeof(quick_str_scan));
for(j=0;j<4;j++)
{
if(ret[j]<0)
{
return -1;
}
}
j=0;
for (token = dst_charset; ; token= NULL)
{
sub_token= strtok_r(token,"/", &saveptr);
if (sub_token == NULL)
break;
ret[3]=map_str2int(string2int_map,str_tolower(sub_token),(int*)&(p->dst_charset[j]));
if(ret[3]>0)
{
if(p->dst_charset[j]==p->src_charset)
{
p->src_charset_in_dst=TRUE;
}
j++;
}
else
{
return -1;
}
}
return 0;
}
Maat_table_desc* table_info_new(int max_thread_num)
{
struct Maat_table_desc*p=ALLOC(struct Maat_table_desc, 1);
p->conj_cnt=1;
return p;
}
void table_info_free(struct Maat_table_desc*p)
{
free(p);
return;
}
int _read_integer_arrary(char* string, int *array, int size)
{
char *token=NULL,*sub_token=NULL,*saveptr;
int i=0;
for (token = string, i=0; i<size ; token= NULL, i++)
{
sub_token= strtok_r(token,",", &saveptr);
if (sub_token == NULL)
break;
sscanf(sub_token, "%d", array+i);
}
return i;
}
#define COLUMN_PLUGIN_DESCR_JSON 4
int read_plugin_table_description(const char* line, struct Maat_table_desc* p)
{
int i=0,ret=0;
size_t offset=0, len=0;
cJSON* json=NULL, *tmp=NULL, *array_item=NULL;
char* copy_line=NULL, *plug_info=NULL;
struct plugin_table_desc* plugin_desc=&(p->plugin);
copy_line=_maat_strdup(line);
ret=get_column_pos(copy_line, COLUMN_PLUGIN_DESCR_JSON, &offset, &len);
if(i<0)
{
goto error_out;
}
if(offset+len<strlen(copy_line))
{
copy_line[offset+len+1]='\0';
}
plug_info=copy_line+offset;
if(NULL==strchr(plug_info,'{'))//For old version compatible.
{
ret=sscanf(plug_info, "%d", &(plugin_desc->valid_flag_column));
if(ret==0||ret==EOF)
{
plugin_desc->valid_flag_column=-1;
}
free(copy_line);
return 0;
}
json=cJSON_Parse(plug_info);
if(!json)
{
goto error_out;
}
tmp=cJSON_GetObjectItem(json, "key");
if(tmp!=NULL)
{
assert(tmp->type==cJSON_Number);
plugin_desc->key_column=tmp->valueint;
}
tmp=cJSON_GetObjectItem(json, "valid");
if(tmp!=NULL)
{
assert(tmp->type==cJSON_Number);
plugin_desc->valid_flag_column=tmp->valueint;
}
tmp=cJSON_GetObjectItem(json, "tag");
if(tmp!=NULL)
{
assert(tmp->type==cJSON_Number);
plugin_desc->rule_tag_column=tmp->valueint;
}
tmp=cJSON_GetObjectItem(json, "estimate_size");
if(tmp!=NULL)
{
assert(tmp->type==cJSON_Number);
plugin_desc->estimate_size=tmp->valueint;
}
tmp=cJSON_GetObjectItem(json, "foreign");
if(tmp!=NULL)
{
if(tmp->type==cJSON_String)
{
plugin_desc->n_foreign=_read_integer_arrary(tmp->valuestring, plugin_desc->foreign_columns, MAX_FOREIGN_CLMN_NUM);
}
else if(tmp->type==cJSON_Array)
{
plugin_desc->n_foreign= cJSON_GetArraySize(tmp);
for(i=0;i<plugin_desc->n_foreign; i++)
{
array_item=cJSON_GetArrayItem(tmp, i);
assert(array_item->type==cJSON_Number);
plugin_desc->foreign_columns[i]=array_item->valueint;
}
}
}
cJSON_Delete(json);
free(copy_line);
return 0;
error_out:
free(copy_line);
return -1;
}
int read_table_description(struct Maat_table_desc** p_table_info,int num,const char* table_info_path,int max_thread_num,void* logger)
{
FILE*fp=NULL;
char line[MAX_TABLE_LINE_SIZE];
int i=0,ret=0,table_cnt=0;
char table_type_str[16]={0},not_care[1024]={0}, tmp_str[32]={0};
MESA_htable_handle string2int_map=NULL;;
struct Maat_table_desc*p=NULL;
struct Maat_table_desc*conj_table=NULL;
fp=fopen(table_info_path,"r");
if(fp==NULL)
{
fprintf(stderr,"Maat read table info %s error.\n",table_info_path);
MESA_handle_runtime_log(logger, RLOG_LV_FATAL,maat_module,
"Maat read table info %s failed: %s.\n", table_info_path, strerror(errno));
return 0;
}
string2int_map=map_create();
map_register(string2int_map,"expr", TABLE_TYPE_EXPR);
map_register(string2int_map,"ip", TABLE_TYPE_IP);
map_register(string2int_map,"ip_plus", TABLE_TYPE_IP_PLUS);
map_register(string2int_map,"compile", TABLE_TYPE_COMPILE);
map_register(string2int_map,"plugin", TABLE_TYPE_PLUGIN);
map_register(string2int_map,"intval", TABLE_TYPE_INTERVAL);
map_register(string2int_map,"digest", TABLE_TYPE_DIGEST);
map_register(string2int_map,"expr_plus", TABLE_TYPE_EXPR_PLUS);
map_register(string2int_map,"group", TABLE_TYPE_GROUP);
map_register(string2int_map,"similar", TABLE_TYPE_SIMILARITY);
map_register(string2int_map,"quickoff",0);
map_register(string2int_map,"quickon",1);
map_register(string2int_map,"escape",USER_REGION_ENCODE_ESCAPE);
// map_register(string2int_map,"base64",USER_REGION_ENCODE_BASE64); //NOT supported yet
for(i=0;i<MAX_CHARSET_NUM;i++)
{
if(strlen(CHARSET_STRING[i])>0)
{
map_register(string2int_map,CHARSET_STRING[i], i);
}
else
{
break;
}
}
map_register(string2int_map,"yes", 1);
map_register(string2int_map,"no", 0);
i=0;
while(NULL!=fgets(line,sizeof(line),fp))
{
i++;
if(line[0]=='#'||line[0]==' '||line[0]=='\t'||strlen(line)<4)
{
continue;
}
p=table_info_new(max_thread_num);
ret=sscanf(line,"%d\t%s\t%s\t%[a-z0-9\t ]",&(p->table_id)
,p->table_name[0]
,table_type_str
,not_care);
if(ret<3)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL,maat_module,
"Maat read table info %s line %d error: not enough column.",table_info_path,i);
continue;
}
ret=map_str2int(string2int_map,str_tolower(table_type_str),(int*)&(p->table_type));
if(ret<0)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL,maat_module,
"Maat read table info %s line %d error:invalid table type.",table_info_path,i);
goto invalid_table;
}
switch(p->table_type)
{
case TABLE_TYPE_EXPR:
case TABLE_TYPE_EXPR_PLUS:
ret=read_expr_table_info(line, p, string2int_map);
if(ret<0)
{
fprintf(stderr,"Maat read table info %s line %d error:illegal column.\n",table_info_path,i);
MESA_handle_runtime_log(logger, RLOG_LV_FATAL,maat_module,
"Maat read table info %s line %d error:illegal column.",table_info_path,i);
goto invalid_table;
}
break;
case TABLE_TYPE_PLUGIN:
ret=read_plugin_table_description(line, p);
if(ret<0)
{
fprintf(stderr,"Maat read table info %s line %d error:illegal plugin info.\n",table_info_path,i);
MESA_handle_runtime_log(logger, RLOG_LV_FATAL,maat_module,
"Maat read table info %s line %d error:illegal plugin info.",table_info_path,i);
goto invalid_table;
}
break;
case TABLE_TYPE_COMPILE:
ret=sscanf(not_care,"%[a-z0-9]",tmp_str);
if(ret>0)
{
ret=map_str2int(string2int_map,str_tolower(tmp_str),(int*)&(p->compile.user_region_encoding));
}
if(ret!=1)
{
p->compile.user_region_encoding=USER_REGION_ENCODE_NONE;
}
default:
break;
}
if(p->table_id>=num)
{
fprintf(stderr,"Maat read table info %s:%d error: table id %uh > %d.\n",table_info_path,i,p->table_id,num);
MESA_handle_runtime_log(logger, RLOG_LV_FATAL,maat_module,
"Maat read table info %s line %d error: table id %uh > %d.\n",table_info_path,i,p->table_id,num);
goto invalid_table;
}
if(p_table_info[p->table_id]!=NULL)//duplicate table_id,means conjunction table;
{
conj_table=p_table_info[p->table_id];
if(conj_table->conj_cnt==MAX_CONJUNCTION_TABLE_NUM)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_module,
"Maat read table info %s line %d error:reach tableid %d conjunction upper limit."
,table_info_path,i,p->table_id);
goto invalid_table;
}
memcpy(conj_table->table_name[conj_table->conj_cnt],p->table_name[0],MAX_TABLE_NAME_LEN);
conj_table->conj_cnt++;
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_module,
"Maat read table info %s:%d:conjunction %s with %s (id=%d,total=%d)."
,table_info_path,i,p->table_name[0]
,conj_table->table_name[0],conj_table->table_id,conj_table->conj_cnt);
//use goto to free the conjunctioned table_info
goto invalid_table;
}
p_table_info[p->table_id]=p;
table_cnt++;
continue;
invalid_table:
table_info_free(p);
p=NULL;
}
fclose(fp);
map_destroy(string2int_map);
return table_cnt;
}
struct Maat_group_inner* create_group_rule(int group_id, int table_id, struct Maat_scanner *scanner)
{
int ret=0;
@@ -2222,10 +1903,11 @@ int add_expr_rule(struct Maat_table_desc* table,struct db_str_rule_t* db_rule,st
if(ret<0)
{
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_module ,
"Table %s region cfg %d charset convert from %s to %s failed.",table->table_name
,db_rule->region_id
,CHARSET_STRING[expr_desc->src_charset]
,CHARSET_STRING[dst_charset]);
"Table %s region cfg %d charset convert from %s to %s failed.",
table->table_name,
db_rule->region_id,
charset_get_name(expr_desc->src_charset),
charset_get_name(dst_charset));
free(region_string);
op_expr->convert_failed++;
expr_desc->iconv_err_cnt++;
@@ -2736,7 +2418,7 @@ void update_group_rule(struct Maat_table_desc* table,const char* table_line,stru
return;
}
void update_expr_rule(struct Maat_table_desc* table,const char* table_line,struct Maat_scanner *scanner,void* logger,int group_mode_on)
void update_expr_rule(struct Maat_table_desc* table,const char* table_line,struct Maat_scanner *scanner,void* logger)
{
struct db_str_rule_t* maat_str_rule=ALLOC(struct db_str_rule_t, 1);
int ret=0,db_hexbin=0,rule_type=0;
@@ -3024,7 +2706,7 @@ int ip_format2range(int ip_type, enum MAAT_IP_FORMAT format, const char* ip1, co
}
return 0;
}
void update_ip_rule(struct Maat_table_desc* table, const char* table_line, struct Maat_scanner *scanner, void* logger, int group_mode_on)
void update_ip_rule(struct Maat_table_desc* table, const char* table_line, struct Maat_scanner *scanner, void* logger)
{
struct db_ip_rule_t* ip_rule=(struct db_ip_rule_t*)calloc(sizeof(struct db_ip_rule_t),1);
char src_ip1[40]={0}, src_ip2[40]={0}, dst_ip1[40]={0}, dst_ip2[40]={0};
@@ -3233,7 +2915,7 @@ error_out:
ip_rule=NULL;
}
void update_intval_rule(struct Maat_table_desc* table,const char* table_line,struct Maat_scanner *scanner,void* logger,int group_mode_on)
void update_intval_rule(struct Maat_table_desc* table, const char* table_line, struct Maat_scanner *scanner, void* logger)
{
struct db_intval_rule* intval_rule=ALLOC(struct db_intval_rule, 1);
struct Maat_table_runtime* table_rt=scanner->table_rt[table->table_id];
@@ -3385,7 +3067,7 @@ error_out:
return;
}
void update_digest_rule(struct Maat_table_desc* table,const char* table_line,struct Maat_scanner *scanner,void* logger,int group_mode_on)
void update_digest_rule(struct Maat_table_desc* table, const char* table_line, struct Maat_scanner *scanner, void* logger)
{
struct Maat_table_runtime* table_rt=scanner->table_rt[table->table_id];
struct db_digest_rule* digest_rule=ALLOC(struct db_digest_rule, 1);
@@ -4008,18 +3690,18 @@ int maat_update_cb(const char* table_name,const char* line,void *u_para)
{
case TABLE_TYPE_EXPR:
case TABLE_TYPE_EXPR_PLUS:
update_expr_rule(feather->p_table_info[table_id], line, scanner,feather->logger,feather->GROUP_MODE_ON);
update_expr_rule(feather->p_table_info[table_id], line, scanner, feather->logger);
break;
case TABLE_TYPE_IP:
case TABLE_TYPE_IP_PLUS:
update_ip_rule(feather->p_table_info[table_id], line, scanner,feather->logger,feather->GROUP_MODE_ON);
update_ip_rule(feather->p_table_info[table_id], line, scanner, feather->logger);
break;
case TABLE_TYPE_INTERVAL:
update_intval_rule(feather->p_table_info[table_id], line, scanner,feather->logger,feather->GROUP_MODE_ON);
update_intval_rule(feather->p_table_info[table_id], line, scanner,feather->logger);
break;
case TABLE_TYPE_DIGEST:
case TABLE_TYPE_SIMILARITY:
update_digest_rule(feather->p_table_info[table_id], line, scanner,feather->logger,feather->GROUP_MODE_ON);
update_digest_rule(feather->p_table_info[table_id], line, scanner,feather->logger);
break;
case TABLE_TYPE_COMPILE:
update_compile_rule(feather->p_table_info[table_id], line, scanner, feather->accept_tags, feather->n_tags, feather->logger);
@@ -4207,16 +3889,8 @@ void *thread_rule_monitor(void *arg)
garbage_bury(feather->garbage_q,0,feather->logger);
assert(0==MESA_lqueue_get_count(feather->garbage_q));
MESA_lqueue_destroy(feather->garbage_q,lqueue_destroy_cb,NULL);
int i=0;
for(i=0;i<MAX_TABLE_NUM;i++)
{
if(feather->p_table_info[i]==NULL)
{
continue;
}
table_info_free(feather->p_table_info[i]);
feather->p_table_info[i]=NULL;
}
Maat_table_clear(feather->p_table_info, MAX_TABLE_NUM);
alignment_int64_array_free(feather->thread_call_cnt);
alignment_int64_array_free(feather->inner_mid_cnt);
alignment_int64_array_free(feather->outer_mid_cnt);
@@ -4235,6 +3909,7 @@ void *thread_rule_monitor(void *arg)
feather->mr_ctx.write_ctx=NULL;
}
}
int i=0;
for(i=0; i<feather->n_tags; i++)
{
free(feather->accept_tags[i].tag_name);