代码适配Maat command、maat json。

This commit is contained in:
zhengchao
2020-06-13 21:05:42 +08:00
parent 7e1cb56d4f
commit 2c80ba4c0a
22 changed files with 935 additions and 1513 deletions

View File

@@ -244,20 +244,22 @@ int invalidate_line(char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq)
line[i]='0';
return 0;
}
void serialize_group(const struct Maat_group_t* p_group, enum MAAT_OPERATION op, char* buff, size_t sz)
void serialize_group2group(enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g, char* buff, size_t sz)
{
if(op==MAAT_OP_RENEW_TIMEOUT) op=MAAT_OP_ADD;
const char* vt_name="null";
if(p_group->virtual_table_name!=NULL)
{
vt_name=p_group->virtual_table_name;
}
snprintf(buff, sz, "%d\t%d\t%d\t%d\t%d\t%s", p_group->group_id,
p_group->parent_id,
op,
p_group->not_flag,
p_group->parent_type,
vt_name);
snprintf(buff, sz, "%d\t%d\t%d", g2g->group_id,
g2g->superior_group_id,
op);
return;
}
void serialize_group2compile(enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c, char* buff, size_t sz)
{
snprintf(buff, sz, "%d\t%d\t%d\t%d\t%s\t%d", g2c->group_id,
g2c->compile_id,
op,
g2c->not_flag,
g2c->virtual_table_name?g2c->virtual_table_name:"null",
g2c->Nth_clause==0?1:g2c->Nth_clause);
return;
}
void serialize_compile(const struct Maat_rule_t* p_m_rule, const char* huge_service_defined, int group_num, enum MAAT_OPERATION op, char* buff, size_t sz)
@@ -276,7 +278,7 @@ void serialize_compile(const struct Maat_rule_t* p_m_rule, const char* huge_serv
group_num);
return;
}
void serialize_region(const struct Maat_region_t* p, int group_id, char* buff, size_t sz)
void serialize_region(const struct Maat_cmd_region* p, int group_id, char* buff, size_t sz)
{
UNUSED size_t ret=0;
switch(p->region_type)
@@ -842,174 +844,7 @@ int get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int rul
}
return 0;
}
int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cnt, int* new_group_cnt)
{
int serial_num=0;
int i=0;
struct Maat_cmd_t* cmd=&(_cmd->cmd);
serial_num++;//compile rule
for(i=0;i<cmd->group_num;i++)
{
serial_num++;
if(cmd->groups[i].regions==NULL)
{
continue;
}
if(_cmd->op==MAAT_OP_ADD)
{
*new_region_cnt+=cmd->groups[i].region_num;
(*new_group_cnt)++;
}
//for MAAT_OP_DEL, if the group's refcnt>1, group->region_num=0.
//so it's OK to add.
serial_num+=cmd->groups[i].region_num;
}
return serial_num;
}
int reconstruct_cmd(struct _Maat_feather_t *feather, struct _Maat_cmd_inner_t* _cmd)
{
int i=0,j=0,grp_idx=0;
struct Maat_cmd_t* cmd=&(_cmd->cmd);
struct Maat_group_t* group_cmd=NULL;
struct Maat_region_t* region_cmd=NULL;
struct Maat_compile_inner *relation=NULL;
struct Maat_group_inner* group_inner=NULL;
struct Maat_region_inner* region_inner=NULL;
void* logger=feather->logger;
int config_id=cmd->compile.config_id;
if(feather->scanner==NULL)
{
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_command
,"MAAT not ready.");
return -1;
}
relation=(struct Maat_compile_inner *)HASH_fetch_by_id(feather->scanner->compile_hash, config_id);
//Operation on relation is thread safe, no immediate memory free when delete a compile rule or a scanner.
//In another words, if the relation is accessable from compile means, its was valid in at least 10 seconds (garbage bury).
if(relation==NULL)
{
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_command
,"config %d not exist."
,config_id);
return -1;
}
pthread_rwlock_rdlock(&(relation->rwlock));
cmd->group_num=relation->group_cnt;
assert(cmd->groups==NULL);
cmd->groups=ALLOC(struct Maat_group_t, cmd->group_num);
for(i=0;i<relation->group_boundary;i++)
{
group_inner=(struct Maat_group_inner*)dynamic_array_read(relation->groups,i);
if(group_inner==NULL)
{
continue;
}
group_cmd=&(cmd->groups[grp_idx]);
group_cmd->group_id=group_inner->group_id;
if(group_inner->ref_by_parent_cnt>1)
{
continue;
}
group_cmd->region_num=group_inner->region_cnt;
group_cmd->regions=ALLOC(struct Maat_region_t, group_cmd->region_num);
for(j=0;j<group_inner->region_boundary;j++)
{
region_inner=(struct Maat_region_inner*)dynamic_array_read(group_inner->regions,j);
if(region_inner==NULL)
{
continue;
}
region_cmd=&(group_cmd->regions[j]);
region_cmd->table_name=_maat_strdup(Maat_table_get_name_by_id(feather->table_mgr, region_inner->table_id));
region_cmd->region_id=region_inner->region_id;
//NOTICE: region_type only avilable when OP_ADD,
region_cmd->region_type=REGION_EXPR;
}
grp_idx++;
}
pthread_rwlock_unlock(&(relation->rwlock));
return 0;
}
int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd, struct serial_rule_t* list, int size)
{
struct Maat_group_t* p_group=NULL;
struct Maat_region_t* p_region=NULL;
struct Maat_rule_t* p_m_rule=NULL;
struct Maat_cmd_t* cmd=&(_cmd->cmd);
enum MAAT_OPERATION op=_cmd->op;
int rule_num=0,i=0,j=0;
p_m_rule=&(cmd->compile);
char line[MAX_TABLE_LINE_SIZE];
time_t timeout=0;
if(_cmd->cmd.expire_after>0)
{
timeout=feather->server_time+_cmd->cmd.expire_after;
}
if(op==MAAT_OP_ADD)
{
serialize_compile(p_m_rule, _cmd->huge_service_defined, cmd->group_num, MAAT_OP_ADD, line, sizeof(line));
set_serial_rule(list+rule_num, MAAT_OP_ADD, cmd->compile.config_id, cmd->label_id, feather->compile_tn, line, timeout);
}
else
{
set_serial_rule(list+rule_num, op, cmd->compile.config_id, cmd->label_id, feather->compile_tn, NULL, timeout);
}
rule_num++;
for(i=0;i<cmd->group_num;i++)
{
p_group=&(cmd->groups[i]);
if(op==MAAT_OP_ADD)
{
if(feather->AUTO_NUMBERING_ON==1)
{
p_group->group_id=feather->base_grp_seq;
feather->base_grp_seq++;
}
p_group->parent_id=p_m_rule->config_id;
p_group->parent_type=PARENT_TYPE_COMPILE;
serialize_group(p_group, MAAT_OP_ADD, line, sizeof(line));
set_serial_rule(list+rule_num, MAAT_OP_ADD, p_group->group_id, 0, feather->group_tn, line, timeout);
}
else
{
set_serial_rule(list+rule_num,op,p_group->group_id,0,feather->group_tn,NULL,timeout);
}
rule_num++;
if(p_group->regions==NULL)//group reuse.
{
continue;
}
for(j=0;j<p_group->region_num;j++)
{
p_region=&(p_group->regions[j]);
if(op==MAAT_OP_ADD)
{
if(feather->AUTO_NUMBERING_ON==1)
{
p_region->region_id=feather->base_rgn_seq;
feather->base_rgn_seq++;
}
serialize_region(p_region, p_group->group_id, line, sizeof(line));
set_serial_rule(list+rule_num,MAAT_OP_ADD
,p_region->region_id,0,p_region->table_name,line,timeout);
}
else
{
set_serial_rule(list+rule_num,op
,p_region->region_id,0,p_region->table_name,NULL,timeout);
}
rule_num++;
}
}
assert(rule_num<=size);
return rule_num;
}
int mr_transaction_success(redisReply* data_reply)
{
if(data_reply->type==REDIS_REPLY_NIL)
@@ -1380,58 +1215,7 @@ error_out:
return success_cnt;
}
int fix_table_name(_Maat_feather_t* feather,struct Maat_cmd_t* cmd)
{
int i=0, j=0;
const char *table_name=NULL;
int table_id=0;
struct Maat_group_t* p_group=NULL;
struct Maat_region_t* p_region=NULL;
enum MAAT_TABLE_TYPE table_type;
struct Maat_compile_inner *compile_rule=NULL;
if(feather->scanner!=NULL)
{
compile_rule=(struct Maat_compile_inner*)HASH_fetch_by_id(feather->scanner->compile_hash, cmd->compile.config_id);
if(compile_rule!=NULL)
{
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module
,"Maat rule %d already exisits.",cmd->compile.config_id);
return -1;
}
}
for(i=0;i<cmd->group_num;i++)
{
p_group=&(cmd->groups[i]);
for(j=0;j<p_group->region_num;j++)
{
p_region=&(p_group->regions[j]);
table_name=p_region->table_name;
table_id=Maat_table_get_id_by_name(feather->table_mgr, table_name);
if(table_id<0)
{
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module
,"Unknown table %s of Maat_cmd_t[%d]->group[%d]->region[%d]."
,table_name,cmd->compile.config_id,i,j);
return -1;
}
table_type=type_region2table(p_region);
if(table_type!=Maat_table_get_type_by_id(feather->table_mgr, table_id))
{
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module
,"Table %s not support region type %d of Maat_cmd_t[%d]->group[%d]->region[%d]."
,table_name
,p_region->region_type
,cmd->compile.config_id,i,j);
return -1;
}
free((char*)p_region->table_name);
p_region->table_name=_maat_strdup(Maat_table_get_name_by_id(feather->table_mgr, table_id));
}
}
return 0;
}
void check_maat_expiration(redisContext *ctx, void *logger)
{
unsigned int i=0,s_rule_num=0;
@@ -2023,70 +1807,8 @@ void _maat_empty_region(struct Maat_region_t* p)
memset(p,0,sizeof(struct Maat_region_t));
return;
}
struct Maat_cmd_t* Maat_create_cmd(const struct Maat_rule_t* rule, int group_num)
{
int i=0;
struct _Maat_cmd_inner_t* _cmd=(struct _Maat_cmd_inner_t*)calloc(sizeof(struct _Maat_cmd_inner_t),1);
memcpy(&(_cmd->cmd.compile),rule,sizeof(_cmd->cmd.compile));
_cmd->ref_cnt=1;
_cmd->cmd.group_num=group_num;
if(group_num>0)
{
_cmd->cmd.groups=(struct Maat_group_t*)calloc(sizeof(struct Maat_group_t),group_num);
}
else
{
_cmd->cmd.groups=NULL;
}
for(i=0;i<group_num;i++)
{
_cmd->cmd.groups[i].regions=(struct Maat_region_t*)calloc(sizeof(struct Maat_region_t),1);
_cmd->region_size[i]=1;
}
return (struct Maat_cmd_t*)_cmd;
}
int Maat_cmd_set_opt(struct Maat_cmd_t* cmd, enum MAAT_RULE_OPT type, const char* val, int size)
{
struct _Maat_cmd_inner_t* _cmd=(struct _Maat_cmd_inner_t* )cmd;
int ret=-1;
switch(type)
{
case MAAT_RULE_SERV_DEFINE:
if(size>MAX_HUGE_SERVICE_DEFINE_LEN)
{
ret=-1;
}
else
{
if(_cmd->huge_service_defined!=NULL)
{
free(_cmd->huge_service_defined);
}
_cmd->huge_service_defined=(char*)calloc(sizeof(char),size+1);
memcpy(_cmd->huge_service_defined,val, size);
ret=0;
}
break;
default:
break;
}
return ret;
}
int Maat_cmd_set_group(Maat_feather_t feather,int group_id, const struct Maat_region_t* region, enum MAAT_OPERATION op)
{
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
if(_feather->AUTO_NUMBERING_ON==1)
{
return -1;
}
//struct Maat_group_inner* group_inner=NULL;
//group_inner=(struct Maat_group_inner*)HASH_fetch_by_id(_feather->scanner->group_hash, group_id);
//NOT implemented yet.
assert(0);
return 0;
}
int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** line_rule, int line_num ,enum MAAT_OPERATION op)
{
int i=0, j=0;
@@ -2200,7 +1922,7 @@ error_out:
free(s_rule);
return ret;
}
int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_cmd_line* line_rule, enum MAAT_OPERATION op)
{
int ret=0;
@@ -2254,203 +1976,7 @@ int Maat_cmd_set_file(Maat_feather_t feather,const char* key, const char* value,
}
freeReplyObject(reply);
return 1;
}
void Maat_add_region2cmd(struct Maat_cmd_t* cmd,int which_group,const struct Maat_region_t* region)
{
struct _Maat_cmd_inner_t* _cmd=(struct _Maat_cmd_inner_t*)cmd;
struct Maat_region_t* dst=NULL;
struct Maat_group_t* p_group=NULL;
p_group=&(cmd->groups[which_group]);
assert(which_group<cmd->group_num);
assert(region->table_name!=NULL);
if(p_group->region_num==_cmd->region_size[which_group])
{
_cmd->region_size[which_group]*=2;
p_group->regions=(struct Maat_region_t*)realloc(p_group->regions,sizeof(struct Maat_region_t)*_cmd->region_size[which_group]);
}
dst=&(p_group->regions[p_group->region_num]);
p_group->region_num++;
_maat_copy_region(dst, region);
return;
}
void Maat_free_cmd(struct Maat_cmd_t* cmd)
{
struct _Maat_cmd_inner_t* _cmd=(struct _Maat_cmd_inner_t*)cmd;
int i=0,j=0;
_cmd->ref_cnt--;
if(_cmd->ref_cnt>0)
{
return;
}
for(i=0;i<cmd->group_num;i++)
{
for(j=0;j<cmd->groups[i].region_num;j++)
{
_maat_empty_region(&(cmd->groups[i].regions[j]));
}
free(cmd->groups[i].regions);
cmd->groups[i].regions=NULL;
}
free(cmd->groups);
cmd->groups=NULL;
if(_cmd->huge_service_defined!=NULL)
{
free(_cmd->huge_service_defined);
_cmd->huge_service_defined=NULL;
}
_cmd->next=NULL;
free(_cmd);
return;
}
int Maat_format_cmd(struct Maat_cmd_t* rule, char* buffer, int size)
{
//TODO
return 0;
}
int Maat_cmd(Maat_feather_t feather,struct Maat_cmd_t* raw_rule,enum MAAT_OPERATION op)
{
int ret=0;
ret=Maat_cmd_append(feather,raw_rule,op);
if(ret<0)
{
return ret;
}
ret=Maat_cmd_commit(feather);
return ret;
}
int Maat_cmd_append(Maat_feather_t feather,struct Maat_cmd_t* cmd,enum MAAT_OPERATION op)
{
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
struct _Maat_cmd_inner_t* _cmd=(struct _Maat_cmd_inner_t*)cmd;
int ret=0;
_cmd->op=op;
assert(op==MAAT_OP_DEL||op==MAAT_OP_ADD||op==MAAT_OP_RENEW_TIMEOUT);
assert(_cmd->next==NULL);
if(op==MAAT_OP_RENEW_TIMEOUT)
{
assert(cmd->expire_after>0);
}
switch(op)
{
case MAAT_OP_DEL:
case MAAT_OP_RENEW_TIMEOUT:
ret=reconstruct_cmd(_feather, _cmd);
break;
case MAAT_OP_ADD:
ret=fix_table_name(_feather, cmd);
break;
}
if(ret<0)
{
return -1;
}
_cmd->ref_cnt++;
if(_feather->cmd_q_cnt==0)
{
assert(_feather->cmd_qtail==NULL);
assert(_feather->cmd_qtail==_feather->cmd_qhead);
_feather->cmd_qhead=_feather->cmd_qtail=_cmd;
}
else
{
_feather->cmd_qtail->next=_cmd;
_feather->cmd_qtail=_cmd;
}
_feather->cmd_q_cnt++;
return 0;
}
int Maat_cmd_commit(Maat_feather_t feather)
{
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
int ret=-1, i=0;
int new_region_num=0,new_group_num=0;
int serial_rule_num=0,serial_rule_idx=0;
UNUSED int transaction_success=1;
struct _Maat_cmd_inner_t* p=NULL,*n=NULL;
redisContext* write_ctx=NULL;
redisReply* data_reply=NULL;
struct serial_rule_t* s_rule=NULL;
if(_feather->input_mode!=SOURCE_REDIS)
{
return ret;
}
if(_feather->cmd_q_cnt==0)
{
return 0;
}
write_ctx=get_redis_ctx_for_write(_feather);
if(write_ctx==NULL)
{
goto error_out;
}
for(i=0,p=_feather->cmd_qhead;i<_feather->cmd_q_cnt;i++)
{
serial_rule_num+=calculate_serial_rule_num(p, &new_region_num, &new_group_num);
p=p->next;
}
_feather->server_time=redis_server_time(write_ctx);
if(!_feather->server_time)
{
goto error_out;
}
if(_feather->AUTO_NUMBERING_ON==1)
{
data_reply=_wrap_redisCommand(write_ctx,"INCRBY %s %d", mr_region_id_var, new_region_num);
if(data_reply->type!=REDIS_REPLY_INTEGER)
{
freeReplyObject(data_reply);
goto error_out;
}
_feather->base_rgn_seq=data_reply->integer-new_region_num;
freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(write_ctx,"INCRBY %s %d", mr_group_id_var, new_group_num);
if(data_reply->type!=REDIS_REPLY_INTEGER)
{
freeReplyObject(data_reply);
goto error_out;
}
_feather->base_grp_seq=data_reply->integer-new_group_num;
freeReplyObject(data_reply);
}
s_rule=(struct serial_rule_t*)calloc(sizeof(struct serial_rule_t),serial_rule_num);
for(i=0,p=_feather->cmd_qhead;i<_feather->cmd_q_cnt;i++)
{
serial_rule_idx+=build_serial_rule(_feather,p,s_rule+serial_rule_idx, serial_rule_num-serial_rule_idx);
p=p->next;
}
assert(serial_rule_idx==serial_rule_num);
transaction_success=0;
transaction_success=exec_serial_rule(write_ctx, s_rule,serial_rule_num,_feather->server_time,_feather->logger);
assert(transaction_success==serial_rule_num);
ret=_feather->cmd_q_cnt;
_feather->cmd_acc_num+=_feather->cmd_q_cnt;
error_out:
p=_feather->cmd_qhead;
for(i=0;i<_feather->cmd_q_cnt;i++)
{
n=p->next;
Maat_free_cmd((struct Maat_cmd_t* )p);
p=n;
}
_feather->cmd_qhead=_feather->cmd_qtail=NULL;
_feather->cmd_q_cnt=0;
for(i=0;i<serial_rule_num && s_rule;i++)
{
empty_serial_rules(s_rule+i);
}
free(s_rule);
return ret;
long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment)
{
@@ -2613,45 +2139,62 @@ int redis_flush_DB(redisContext* ctx, int db_index, void* logger)
);
}
return redis_transaction_success;
}
}
int Maat_command_raw_set_compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after)
{
redisContext* write_ctx=get_redis_ctx_for_write(_feather);
_feather->server_time=redis_server_time(write_ctx);
if(!_feather->server_time)
{
return -1;
}
struct serial_rule_t s_rule;
set_serial_rule(&s_rule, op, id, 0, table_name, line, 0);
int transaction_success=0;
transaction_success=exec_serial_rule(write_ctx, &s_rule, 1, _feather->server_time, _feather->logger);
empty_serial_rules(&s_rule);
return transaction_success;
}
int Maat_command_raw_set_compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int group_num)
{
{
struct Maat_cmd_line line_cmd;
char line[MAX_TABLE_LINE_SIZE];
serialize_compile(compile, huge_service_defined, group_num, op, line, sizeof(line));
char line[MAX_TABLE_LINE_SIZE];
serialize_compile(compile, huge_service_defined, clause_num, op, line, sizeof(line));
memset(&line_cmd, 0, sizeof(line_cmd));
line_cmd.table_name=table_name;
line_cmd.rule_id=compile->config_id;
line_cmd.table_line=line;
line_cmd.label_id=label_id;
line_cmd.expire_after=expire_after;
int ret=Maat_cmd_set_line(feather, &line_cmd, op);
return ret;
}
}
int Maat_command_raw_set_region(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id)
{
{
struct Maat_cmd_line line_cmd;
char line[MAX_TABLE_LINE_SIZE];
serialize_region(region, group_id, line, sizeof(line));
serialize_region(region, group_id, line, sizeof(line));
memset(&line_cmd, 0, sizeof(line_cmd));
line_cmd.table_name=region->table_name;
line_cmd.rule_id=region->region_id;
line_cmd.table_line=line;
int ret=Maat_cmd_set_line(feather, &line_cmd, op);
return ret;
}
}
int Maat_command_raw_set_group2compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c)
{
{
struct Maat_cmd_line line_cmd;
char line[MAX_TABLE_LINE_SIZE];
serialize_group(group, op, line, sizeof(line));
char line[MAX_TABLE_LINE_SIZE];
serialize_group2compile(op, g2c, line, sizeof(line));
assert(g2c->group_id<1024*1024);
memset(&line_cmd, 0, sizeof(line_cmd));
line_cmd.table_name=g2c->table_name;
line_cmd.rule_id=g2c->compile_id*1024*1024+g2c->group_id;
line_cmd.table_line=line;
int ret=Maat_cmd_set_line(feather, &line_cmd, op);
return ret;
}
int Maat_command_raw_set_group2group(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g)
{
struct Maat_cmd_line line_cmd;
char line[MAX_TABLE_LINE_SIZE];
assert(g2g->group_id<1024*1024);
serialize_group2group(op, g2g, line, sizeof(line));
memset(&line_cmd, 0, sizeof(line_cmd));
line_cmd.table_name=g2g->table_name;
line_cmd.rule_id=g2g->superior_group_id*1024*1024+g2g->group_id;
line_cmd.table_line=line;
int ret=Maat_cmd_set_line(feather, &line_cmd, op);
return ret;
}
int Maat_cmd_flushDB(Maat_feather_t feather)
{