From 3571096bb62b5bbadae1cbea02729216fd51c8c1 Mon Sep 17 00:00:00 2001 From: zhengchao Date: Thu, 6 Jul 2017 21:20:24 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E6=94=AF=E6=8C=81=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E8=B6=85=E6=97=B6=EF=BC=9B2=E3=80=81=E6=94=AF=E6=8C=81plugin?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=EF=BC=9B3=E3=80=81=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E6=A0=87=E7=AD=BE=E6=9F=A5=E8=AF=A2=EF=BC=9B4=E3=80=81?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E9=80=9A=E8=BF=87redis=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E5=85=A8=E5=B1=80=E5=BA=8F=E5=88=97=E5=8F=B7=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- inc/Maat_command.h | 39 +-- inc/Maat_rule.h | 28 ++- src/entry/Maat_api.cpp | 16 ++ src/entry/Maat_command.cpp | 437 +++++++++++++++++++++++---------- src/entry/Maat_rule.cpp | 4 +- src/entry/Maat_rule_internal.h | 4 +- test/maat_test.cpp | 33 +-- 7 files changed, 390 insertions(+), 171 deletions(-) diff --git a/inc/Maat_command.h b/inc/Maat_command.h index b749175..271ab85 100644 --- a/inc/Maat_command.h +++ b/inc/Maat_command.h @@ -9,6 +9,7 @@ enum MAAT_OPERATION MAAT_OP_DEL=0, MAAT_OP_ADD }; + enum MAAT_REGION_TYPE { REGION_EXPR, @@ -89,7 +90,7 @@ struct Maat_rgn_sim_t struct Maat_region_t { const char* table_name; - int region_id; //Any, maat will assigned one. + int region_id; //If MAAT_OPT_CMD_AUTO_NUMBERING==1, maat will assigned one. Or users must appoint a unique number. enum MAAT_REGION_TYPE region_type; union { @@ -103,26 +104,36 @@ struct Maat_region_t struct Maat_group_t { int region_num; - int group_id; //Any, maat will assigned one. - char* group_name;//optional, for group reuse. + int group_id; //If MAAT_OPT_CMD_AUTO_NUMBERING==1, maat will assigned one. Or users must appoint a unique number. struct Maat_region_t *regions; }; -struct Maat_command_t +struct Maat_cmd_t { struct Maat_rule_t compile;// for MAAT_OP_DEL, only compile.config_id is necessary. - int group_num; // for MAAT_OP_DEL, Any. - struct Maat_group_t* groups;// for MAAT_OP_DEL, SET to NULL. + int group_num; // for MAAT_OP_DEL, set to 0. + int expire_after; //expired after $timeout$ seconds, set to 0 for never timeout. + int label_id; //>0, for Maat_cmd_select + struct Maat_group_t* groups;// for MAAT_OP_DEL, set to NULL. }; -struct Maat_command_t* Maat_create_comand(const struct Maat_rule_t* rule, int group_num); -void Maat_cmd_add_region(struct Maat_command_t* cmd,int which_group,const struct Maat_region_t* region); -void Maat_free_command(struct Maat_command_t* cmd); -int Maat_format_command(struct Maat_command_t* cmd, char* buffer, int size); +struct Maat_cmd_t* Maat_create_cmd(const struct Maat_rule_t* rule, int group_num, const char* label); +void Maat_add_region2cmd(struct Maat_cmd_t* cmd,int which_group,const struct Maat_region_t* region); -// The command functions are NOT thread safe. -int Maat_command(Maat_feather_t feather,struct Maat_command_t* cmd,enum MAAT_OPERATION op); +void Maat_free_cmd(struct Maat_cmd_t* cmd); +int Maat_format_cmd(struct Maat_cmd_t* cmd, char* buffer, int size); + +// The below command functions are NOT thread safe. +int Maat_cmd(Maat_feather_t feather,struct Maat_cmd_t* cmd,enum MAAT_OPERATION op); //pipeline model -int Maat_append_command(Maat_feather_t feather,struct Maat_command_t* cmd,enum MAAT_OPERATION op); -int Maat_commit_command(Maat_feather_t feather); +int Maat_cmd_append(Maat_feather_t feather,struct Maat_cmd_t* cmd,enum MAAT_OPERATION op); +int Maat_cmd_commit(Maat_feather_t feather); + + +int Maat_cmd_set_group(Maat_feather_t feather, int group_id, const struct Maat_region_t* region, enum MAAT_OPERATION op); +int Maat_cmd_set_line(Maat_feather_t feather, const char* table_name,int rule_id, const char* line, int timeout, enum MAAT_OPERATION op); + +//Return the value of key after the increment. +long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment); +int Maat_cmd_select(Maat_feather_t feather, int label, int * output_ids, int size); #endif diff --git a/inc/Maat_rule.h b/inc/Maat_rule.h index 9bf3d13..d538cc2 100644 --- a/inc/Maat_rule.h +++ b/inc/Maat_rule.h @@ -135,20 +135,22 @@ int Maat_initiate_feather(Maat_feather_t feather); enum MAAT_INIT_OPT { - MAAT_OPT_SCANDIR_INTERVAL_MS=1, //VALUE is interger,SIZE=sizeof(int). DEFAULT:1,000 milliseconds. - MAAT_OPT_EFFECT_INVERVAL_MS, //VALUE is interger,SIZE=sizeof(int). DEFAULT:60,000 milliseconds. - MAAT_OPT_FULL_CFG_DIR, //VALUE is a const char*,MUST end with '\0',SIZE= strlen(string+'\0')+1.DEFAULT: no default. - MAAT_OPT_INC_CFG_DIR, //VALUE is a const char*,MUST end with '\0',SIZE= strlen(string+'\0')+1.DEFAULT: no default. - MAAT_OPT_JSON_FILE_PATH, //VALUE is a const char*,MUST end with '\0',SIZE= strlen(string+'\0')+1.DEFAULT: no default. - MAAT_OPT_STAT_ON, //VALUE is NULL,SIZE is 0.MAAT_OPT_STAT_FILE_PATH must be set.Default: stat OFF. - MAAT_OPT_PERF_ON, //VALUE is NULL,SIZE is 0.MAAT_OPT_STAT_FILE_PATH must be set.Default: stat OFF. - MAAT_OPT_STAT_FILE_PATH, //VALUE is a const char*,MUST end with '\0',SIZE= strlen(string+'\0')+1.DEFAULT: no default. - MAAT_OPT_SCAN_DETAIL, //VALUE is interger *,SIZE=sizeof(int). 0: not return any detail;1: return hit pos, not include regex grouping; + MAAT_OPT_SCANDIR_INTERVAL_MS=1, //VALUE is interger, SIZE=sizeof(int). DEFAULT:1,000 milliseconds. + MAAT_OPT_EFFECT_INVERVAL_MS, //VALUE is interger, SIZE=sizeof(int). DEFAULT:60,000 milliseconds. + MAAT_OPT_FULL_CFG_DIR, //VALUE is a const char*, MUST end with '\0', SIZE= strlen(string+'\0')+1.DEFAULT: no default. + MAAT_OPT_INC_CFG_DIR, //VALUE is a const char*, MUST end with '\0', SIZE= strlen(string+'\0')+1.DEFAULT: no default. + MAAT_OPT_JSON_FILE_PATH, //VALUE is a const char*, MUST end with '\0', SIZE= strlen(string+'\0')+1.DEFAULT: no default. + MAAT_OPT_STAT_ON, //VALUE is NULL,SIZE is 0. MAAT_OPT_STAT_FILE_PATH must be set. Default: stat OFF. + MAAT_OPT_PERF_ON, //VALUE is NULL,SIZE is 0. MAAT_OPT_STAT_FILE_PATH must be set. Default: stat OFF. + MAAT_OPT_STAT_FILE_PATH, //VALUE is a const char*, MUST end with '\0', SIZE= strlen(string+'\0')+1. DEFAULT: no default. + MAAT_OPT_SCAN_DETAIL, //VALUE is interger *, SIZE=sizeof(int). 0: not return any detail;1: return hit pos, not include regex grouping; // 2 return hit pos and regex grouping pos;DEFAULT:0 - MAAT_OPT_INSTANCE_NAME, //VALUE is a const char*,MUST end with '\0',SIZE= strlen(string+'\0')+1,no more than 11 bytes.DEFAULT: MAAT_$tableinfo_path$. - MAAT_OPT_DECRYPT_KEY, //VALUE is a const char*,MUST end with '\0',SIZE= strlen(string+'\0')+1. No DEFAULT. - MAAT_OPT_REDIS_IP, //VALUE is a const char*,MUST end with '\0',SIZE= strlen(string+'\0')+1. No DEFAULT. - MAAT_OPT_REDIS_PORT //VALUE is a unsigned short, host order, SIZE= sizeof(unsigned short). No DEFAULT. + MAAT_OPT_INSTANCE_NAME, //VALUE is a const char*, MUST end with '\0', SIZE= strlen(string+'\0')+1, no more than 11 bytes.DEFAULT: MAAT_$tableinfo_path$. + MAAT_OPT_DECRYPT_KEY, //VALUE is a const char*, MUST end with '\0', SIZE= strlen(string+'\0')+1. No DEFAULT. + MAAT_OPT_REDIS_IP, //VALUE is a const char*, MUST end with '\0', SIZE= strlen(string+'\0')+1. No DEFAULT. + MAAT_OPT_REDIS_PORT, //VALUE is a unsigned short, host order, SIZE= sizeof(unsigned short). No DEFAULT. + MAAT_OPT_REDIS_INDEX, //VALUE is interger *, 0~15, SIZE=sizeof(int). DEFAULT: 0. + MAAT_OPT_CMD_AUTO_NUMBERING //VALUE is interger *, 1 or 0, SIZE=sizeof(int). DEFAULT: 1. }; //return -1 if failed, return 0 on success; int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const void* value,int size); diff --git a/src/entry/Maat_api.cpp b/src/entry/Maat_api.cpp index 0d2f3d8..b5d0753 100644 --- a/src/entry/Maat_api.cpp +++ b/src/entry/Maat_api.cpp @@ -478,6 +478,8 @@ Maat_feather_t Maat_feather(int max_thread_num,const char* table_info_path,void* feather->last_full_version=0; feather->base_grp_seq=0; feather->base_rgn_seq=0; + feather->redis_index=0; + feather->AUTO_NUMBERING_ON=1; feather->connect_timeout.tv_sec=0; feather->connect_timeout.tv_usec=100*1000; // 100 ms pthread_mutex_init(&(feather->plugin_table_reg_mutex),NULL); @@ -589,6 +591,20 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo } _feather->redis_port=*((unsigned short*)value); break; + case MAAT_OPT_REDIS_INDEX: + if((size_t)size!=sizeof(int)) + { + return -1; + } + _feather->redis_index=*((int*)value); + break; + case MAAT_OPT_CMD_AUTO_NUMBERING: + if((size_t)size!=sizeof(int)) + { + return -1; + } + _feather->AUTO_NUMBERING_ON=*((int*)value); + break; default: return -1; } diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index 3089476..b1aacea 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -13,20 +13,24 @@ const char* maat_redis_monitor="MAAT_REDIS_MONITOR"; const char* maat_redis_command="MAAT_REDIS_COMMAND"; const char* rm_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"}; -const char* rm_status_key="MAAT_UPDATE_STATUS"; +const char* rm_status_sset="MAAT_UPDATE_STATUS"; +const char* rm_expire_sset="MAAT_EXPIRE_TIMER"; +const char* rm_label_sset="MAAT_LABEL_INDEX"; const int MAAT_REDIS_SYNC_TIME=30*60; struct serial_rule_t //rm= Redis Maat { enum MAAT_OPERATION op;//0: delete, 1: add. int rule_id; + int label_id; + long long timeout; // absolute unix time. char table_name[256]; char* table_line; }; struct _Maat_cmd_inner_t { - struct Maat_command_t cmd; + struct Maat_cmd_t cmd; enum MAAT_OPERATION op; int ref_cnt; int region_size[MAX_EXPR_ITEM_NUM]; @@ -65,6 +69,16 @@ long long read_redis_integer(const redisReply* reply) } return 0; } +long long redis_server_time(redisContext* ctx) +{ + long long server_time=0; + redisReply* data_reply=NULL; + data_reply=_wrap_redisCommand(ctx,"TIME"); + assert(data_reply->type==REDIS_REPLY_INTEGER); + server_time=data_reply->integer; + freeReplyObject(data_reply); + return server_time; +} enum MAAT_TABLE_TYPE type_region2table(const struct Maat_region_t* p) { enum MAAT_TABLE_TYPE ret=TABLE_TYPE_IP; @@ -151,6 +165,43 @@ void invalidate_line(char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq) line[i]='0'; return; } +int del_rule_from_redis(redisContext* ctx, struct serial_rule_t* s_rule, long long new_version) +{ + int append_cmd_cnt=0£» + redisAppendCommand(ctx,"RENAME %s:%s,%d %s:%s,%d" + ,rm_key_prefix[MAAT_OP_ADD] + ,s_rule->table_name + ,s_rule->rule_id + ,rm_key_prefix[MAAT_OP_DEL] + ,s_rule->table_name + ,s_rule->rule_id + ); + append_cmd_cnt++; + redisAppendCommand(ctx,"EXPIRE %s:%s,%d %d",rm_key_prefix[MAAT_OP_DEL] + ,s_rule->table_name + ,s_rule->rule_id + ,MAAT_REDIS_SYNC_TIME); + append_cmd_cnt++; + //NX: Don't update already exisiting elements. Always add new elements. + redisAppendCommand(ctx,"ZADD %s NX %d DEL,%s,%d",rm_status_sset + ,new_version + ,s_rule->table_name + ,s_rule->rule_id); + append_cmd_cnt++; + + // Try to remove from expiration sorted set, no matter wheather it exists or not. + redisAppendCommand(ctx,"ZREM %s %s,%d",rm_expire_sset + ,s_rule->table_name + ,s_rule->rule_id); + append_cmd_cnt++; + if(s_rule->label_id>0) + { + redisAppendCommand(ctx,"ZREM %s %d",rm_label_sset + ,s_rule->rule_id); + } + + return append_cmd_cnt; +} void serialize_region(const struct Maat_region_t* p,int group_id, char* buff,int size) { int ret=0; @@ -229,10 +280,12 @@ void empty_serial_rules(struct serial_rule_t* rule) memset(rule,0,sizeof(struct serial_rule_t)); return; } -void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op,int rule_id,const char* table_name,const char* line) +void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op,int rule_id,int label_id,const char* table_name,const char* line, long long timeout) { rule->op=op; rule->rule_id=rule_id; + rule->label_id=label_id; + rule->timeout=timeout; assert(strlen(table_name)table_name)); memcpy(rule->table_name,table_name,strlen(table_name)); if(line!=NULL) @@ -293,16 +346,16 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** //Returns all the elements in the sorted set at key with a score that version < score <= version_in_redis. //The elements are considered to be ordered from low to high scores(version). - reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%d %d",rm_status_key,version,version_in_redis); + reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%d %d",rm_status_sset,version,version_in_redis); if(reply==NULL) { __redis_strerror_r(errno,err_buff,sizeof(err_buff)); MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "GET %s failed %s.",rm_status_key,err_buff); + "GET %s failed %s.",rm_status_sset,err_buff); return 0; } assert(reply->type==REDIS_REPLY_ARRAY); - tmp_reply=(redisReply*)redisCommand(c, "ZSCORE %s %s",rm_status_key,reply->element[0]->str); + tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str); nearest_rule_version=read_redis_integer(tmp_reply); freeReplyObject(tmp_reply); tmp_reply=NULL; @@ -366,7 +419,7 @@ int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cn { int serial_num=0; int i=0; - struct Maat_command_t* cmd=&(_cmd->cmd); + struct Maat_cmd_t* cmd=&(_cmd->cmd); serial_num++;//compile rule for(i=0;igroup_num;i++) { @@ -378,7 +431,7 @@ int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cn if(_cmd->op==MAAT_OP_ADD) { *new_region_cnt+=cmd->groups[i].region_num; - (*new_group_cnt)++; + (*new_group_cnt)++; } //for MAAT_OP_DEL, if the group's refcnt>0, group->region_num=0. //so it's OK to add. @@ -389,7 +442,7 @@ int calculate_serial_rule_num(struct _Maat_cmd_inner_t* _cmd,int * new_region_cn int reconstruct_cmd(struct _Maat_feather_t *feather, struct _Maat_cmd_inner_t* _cmd) { int i=0,j=0,grp_idx=0; - struct Maat_command_t* cmd=&(_cmd->cmd); + struct Maat_cmd_t* cmd=&(_cmd->cmd); struct Maat_group_t* group_cmd=NULL; struct Maat_region_t* region_cmd=NULL; @@ -450,12 +503,17 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st struct Maat_group_t* p_group=NULL; struct Maat_region_t* p_region=NULL; struct Maat_rule_t* p_m_rule=NULL; - struct Maat_command_t* cmd=&(_cmd->cmd); + struct Maat_cmd_t* cmd=&(_cmd->cmd); enum MAAT_OPERATION op=_cmd->op; int rule_num=0,i=0,j=0; p_m_rule=&(cmd->compile); char line[1024]; + time_t timeout=0; + if(_cmd->cmd.expire_after>0) + { + timeout=feather->server_time+_cmd->cmd.expire_after; + } if(op==MAAT_OP_ADD) { snprintf(line,sizeof(line),"%d\t%d\t%hhd\t%hhd\t%hhd\t0\t%s\t1\t%d",p_m_rule->config_id @@ -465,12 +523,12 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st ,p_m_rule->do_log ,p_m_rule->service_defined ,cmd->group_num); - set_serial_rule(list+rule_num,MAAT_OP_ADD,cmd->compile.config_id,feather->compile_tn,line); + set_serial_rule(list+rule_num,MAAT_OP_ADD,cmd->compile.config_id,cmd->label_id,feather->compile_tn,line,timeout); } else { - set_serial_rule(list+rule_num,MAAT_OP_DEL,cmd->compile.config_id,feather->compile_tn,NULL); + set_serial_rule(list+rule_num,MAAT_OP_DEL,cmd->compile.config_id,cmd->label_id,feather->compile_tn,NULL,timeout); } rule_num++; for(i=0;igroup_num;i++) @@ -478,15 +536,18 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st p_group=&(cmd->groups[i]); if(op==MAAT_OP_ADD) { - p_group->group_id=feather->base_grp_seq; - feather->base_grp_seq++; + if(feather->AUTO_NUMBERING_ON==1) + { + p_group->group_id=feather->base_grp_seq; + feather->base_grp_seq++; + } snprintf(line,sizeof(line),"%d\t%d\t1",p_group->group_id ,p_m_rule->config_id); - set_serial_rule(list+rule_num,MAAT_OP_ADD,p_group->group_id,feather->group_tn,line); + set_serial_rule(list+rule_num,MAAT_OP_ADD,p_group->group_id,0,feather->group_tn,line); } else { - set_serial_rule(list+rule_num,MAAT_OP_DEL,p_group->group_id,feather->group_tn,NULL); + set_serial_rule(list+rule_num,MAAT_OP_DEL,p_group->group_id,0,feather->group_tn,NULL); } rule_num++; if(p_group->regions==NULL)//group reuse. @@ -498,16 +559,19 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st p_region=&(p_group->regions[j]); if(op==MAAT_OP_ADD) { - p_region->region_id=feather->base_rgn_seq; - feather->base_rgn_seq++; + if(feather->AUTO_NUMBERING_ON==1) + { + p_region->region_id=feather->base_rgn_seq; + feather->base_rgn_seq++; + } serialize_region(p_region, p_group->group_id, line, sizeof(line)); set_serial_rule(list+rule_num,MAAT_OP_ADD - ,p_region->region_id,p_region->table_name,line); + ,p_region->region_id,0,p_region->table_name,line); } else { set_serial_rule(list+rule_num,MAAT_OP_DEL - ,p_region->region_id,p_region->table_name,NULL); + ,p_region->region_id,0,p_region->table_name,NULL); } rule_num++; @@ -527,7 +591,7 @@ int mr_transaction_success(redisReply* data_reply) return 1; } } -int fix_table_name(_Maat_feather_t* feather,struct Maat_command_t* cmd) +int fix_table_name(_Maat_feather_t* feather,struct Maat_cmd_t* cmd) { int i=0,j=0,ret=0; const char *table_name=NULL; @@ -558,7 +622,7 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_command_t* cmd) if(ret<0) { MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module - ,"Unknown table %s of Maat_command_t[%d]->group[%d]->region[%d]." + ,"Unknown table %s of Maat_cmd_t[%d]->group[%d]->region[%d]." ,table_name,cmd->compile.config_id,i,j); return -1; @@ -567,7 +631,7 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_command_t* cmd) if(table_type!=feather->p_table_info[table_id]->table_type) { MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module - ,"Table %s not support region type %d of Maat_command_t[%d]->group[%d]->region[%d]." + ,"Table %s not support region type %d of Maat_cmd_t[%d]->group[%d]->region[%d]." ,table_name ,p_region->region_type ,cmd->compile.config_id,i,j); @@ -579,7 +643,46 @@ int fix_table_name(_Maat_feather_t* feather,struct Maat_command_t* cmd) } return 0; } - +void check_maat_expiration(redisContext *ctx, void *logger) +{ + unsigned int i=0,s_rule_num=0; + int ret=0,append_cmd_cnt=0; + int is_success=0; + redisReply* data_reply=NULL; + struct serial_rule_t* s_rule=NULL; + long long server_time=0,maat_redis_version=0; + data_reply=_wrap_redisCommand(ctx, "TIME"); + server_time=data_reply->element[0].integer; + freeReplyObject(data_reply); + data_reply=_wrap_redisCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",rm_expire_sset,server_time); + if(data_reply->type!=REDIS_REPLY_ARRAY) + { + freeReplyObject(data_reply); + return; + } + s_rule_num=data_reply->elements; + s_rule=(struct serial_rule_t*)calloc(sizeof(struct serial_rule_t)*s_rule_num); + for(i=0;ielement[i].str,"%[^,],%d",s_rule[i].table_name,&(s_rule.rule_id)); + assert(ret==2); + } + is_success=exec_serial_rule(ctx,s_rule, s_rule_num); + + if(is_success==1) + { + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_module + ,"Succesfully expried %d rules in Redis.", s_rule_num); + } + else + { + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_module + ,"Failed to expried %d rules in Redis.", s_rule_num); + } + free(s_rule); + return; +} void redis_monitor_traverse(unsigned int version,redisContext *c, void (*start)(unsigned int ,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para void (*update)(const char* ,const char*,void* ),//table name ,line ,u_para @@ -598,6 +701,11 @@ void redis_monitor_traverse(unsigned int version,redisContext *c, unsigned int new_version=0; enum MAAT_TABLE_TYPE table_type; void* logger=feather->logger; + if(feather->redis_write_ctx!=NULL)//authorized to write + { + //For thread safe, deliberately use redis_read_ctx but not redis_write_ctx. + check_maat_expiration(feather->redis_read_ctx, logger); + } rule_num=get_rm_key_list(version, c, &rule_list, logger,&new_version, &update_type); if(rule_num==0) { @@ -635,8 +743,75 @@ void redis_monitor_traverse(unsigned int version,redisContext *c, rule_list=NULL; return; } +int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num) +{ + int append_cmd_cnt=0,i=0; + long long maat_redis_version=0; + redisReply* data_reply=NULL; + int redis_transaction_success=1; + data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION"); + freeReplyObject(data_reply); + data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION"); + maat_redis_version=read_redis_integer(data_reply); + maat_redis_version++; + freeReplyObject(data_reply); + data_reply=_wrap_redisCommand(ctx,"MULTI"); + freeReplyObject(data_reply); + append_cmd_cnt=0; + for(i=0;i0) + { + redisAppendCommand(ctx,"ZADD %s NX %lld %s,%d",rm_expire_sset + ,s_rule[i].timeout + ,s_rule[i].table_name + ,s_rule[i].rule_id); + append_cmd_cnt++; + } + if(s_rule[i].label_id>0) + { + redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset + ,s_rule[i].label_id + ,s_rule[i].rule_id); + append_cmd_cnt++; + } + } + else + { + append_cmd_cnt+=del_rule_from_redis(ctx,s_rule+i,maat_redis_version); + } -void Maat_copy_region(struct Maat_region_t* dst,const struct Maat_region_t* src) + } + redisAppendCommand(ctx,"INCRBY MAAT_VERSION 1"); + append_cmd_cnt++; + redisAppendCommand(ctx,"EXEC"); + append_cmd_cnt++; + redis_transaction_success=1; + for(i=0;itable_name!=NULL) @@ -668,7 +843,7 @@ void Maat_copy_region(struct Maat_region_t* dst,const struct Maat_region_t* src) } return; } -void Maat_empty_region(struct Maat_region_t* p) +void _maat_empty_region(struct Maat_region_t* p) { free((char*)p->table_name); p->table_name=NULL; @@ -699,7 +874,7 @@ void Maat_empty_region(struct Maat_region_t* p) return; } -struct Maat_command_t* Maat_create_comand(const struct Maat_rule_t* rule, int group_num) +struct Maat_cmd_t* Maat_create_cmd(const struct Maat_rule_t* rule, int group_num, const char* label) { int i=0; struct _Maat_cmd_inner_t* _cmd=(struct _Maat_cmd_inner_t*)calloc(sizeof(struct _Maat_cmd_inner_t),1); @@ -719,21 +894,65 @@ struct Maat_command_t* Maat_create_comand(const struct Maat_rule_t* rule, int gr _cmd->cmd.groups[i].regions=(struct Maat_region_t*)calloc(sizeof(struct Maat_region_t),1); _cmd->region_size[i]=1; } - return (struct Maat_command_t*)_cmd; + return (struct Maat_cmd_t*)_cmd; } -void Maat_cmd_set_group(struct Maat_command_t* cmd,int which_group,const char* group_name) +int Maat_cmd_set_group(Maat_feather_t feather,int group_id, const struct Maat_region_t* region, enum MAAT_OPERATION op) { - assert(which_groupgroup_num); - - if(cmd->groups[which_group].group_name!=NULL) + _Maat_feather_t* _feather=(_Maat_feather_t*)feather; + if(_feather->AUTO_NUMBERING_ON==1) { - free(cmd->groups[which_group].group_name); + return -1; } - cmd->groups[which_group].group_name=_maat_strdup(group_name); - - return; + //struct _Maat_group_inner_t* group_inner=NULL; + //group_inner=(struct _Maat_group_inner_t*)HASH_fetch_by_id(_feather->scanner->group_hash, group_id); + //NOT implemented yet. + assert(0); + return 0; } -void Maat_cmd_add_region(struct Maat_command_t* cmd,int which_group,const struct Maat_region_t* region) +int Maat_cmd_set_line(Maat_feather_t feather, const char* table_name,int rule_id, const char* line, int timeout,enum MAAT_OPERATION op) +{ + _Maat_feather_t* _feather=(_Maat_feather_t*)feather; + int ret=0, table_id=0,retry=0; + struct serial_rule_t s_rule; + long long absolute_expire_time=0; + if(_feather->AUTO_NUMBERING_ON==1) + { + return -1; + } + ret=map_str2int(_feather->map_tablename2id, table_name, &table_id); + if(ret<0) + { + MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module + ,"Command set line id %d failed: unknown table %s." + ,rule_id + ,table_name); + + return -1; + } + if(TABLE_TYPE_PLUGIN!=feather->p_table_info[table_id]->table_type) + { + MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module + ,"Command set line id %d failed: table %s is not a plugin table." + ,rule_id + ,table_name); + return -1; + } + if(timeout>0) + { + absolute_expire_time=redis_server_time(_feather->redis_write_ctx); + absolute_expire_time+=timeout; + } + set_serial_rule(&s_rule, op, rule_id,table_name,line, absolute_expire_time); + ret=0; + while(!ret) + { + ret=exec_serial_rule(_feather->redis_write_ctx,&s_rule, 1); + retry++; + assert(retry<5); + } + return 0; +} +void Maat_add_region2cmd(struct Maat_cmd_t* cmd,int which_group,const struct Maat_region_t* region) { struct _Maat_cmd_inner_t* _cmd=(struct _Maat_cmd_inner_t*)cmd; struct Maat_region_t* dst=NULL; @@ -748,11 +967,11 @@ void Maat_cmd_add_region(struct Maat_command_t* cmd,int which_group,const struct } dst=&(p_group->regions[p_group->region_num]); p_group->region_num++; - Maat_copy_region(dst, region); + _maat_copy_region(dst, region); return; } -void Maat_free_command(struct Maat_command_t* cmd) +void Maat_free_cmd(struct Maat_cmd_t* cmd) { struct _Maat_cmd_inner_t* _cmd=(struct _Maat_cmd_inner_t*)cmd; int i=0,j=0; @@ -765,9 +984,8 @@ void Maat_free_command(struct Maat_command_t* cmd) { for(j=0;jgroups[i].region_num;j++) { - Maat_empty_region(&(cmd->groups[i].regions[j])); + _maat_empty_region(&(cmd->groups[i].regions[j])); } - free(cmd->groups[i].group_name); free(cmd->groups[i].regions); cmd->groups[i].regions=NULL; } @@ -777,24 +995,24 @@ void Maat_free_command(struct Maat_command_t* cmd) free(_cmd); return; } -int Maat_format_command(struct Maat_command_t* rule, char* buffer, int size) +int Maat_format_cmd(struct Maat_cmd_t* rule, char* buffer, int size) { //TODO return 0; } -int Maat_command(Maat_feather_t feather,struct Maat_command_t* raw_rule,enum MAAT_OPERATION op) +int Maat_cmd(Maat_feather_t feather,struct Maat_cmd_t* raw_rule,enum MAAT_OPERATION op) { int ret=0; - ret=Maat_append_command(feather,raw_rule,op); + ret=Maat_cmd_append(feather,raw_rule,op); if(ret<0) { return ret; } - ret=Maat_commit_command(feather); + ret=Maat_cmd_commit(feather); return ret; } -int Maat_append_command(Maat_feather_t feather,struct Maat_command_t* cmd,enum MAAT_OPERATION op) +int Maat_cmd_append(Maat_feather_t feather,struct Maat_cmd_t* cmd,enum MAAT_OPERATION op) { _Maat_feather_t* _feather=(_Maat_feather_t*)feather; struct _Maat_cmd_inner_t* _cmd=(struct _Maat_cmd_inner_t*)cmd; @@ -829,7 +1047,7 @@ int Maat_append_command(Maat_feather_t feather,struct Maat_command_t* cmd,enum M return 0; } -int Maat_commit_command(Maat_feather_t feather) +int Maat_cmd_commit(Maat_feather_t feather) { _Maat_feather_t* _feather=(_Maat_feather_t*)feather; @@ -837,7 +1055,7 @@ int Maat_commit_command(Maat_feather_t feather) long long maat_redis_version=0; int new_region_num=0,new_group_num=0; int serial_rule_num=0,serial_rule_idx=0; - int redis_transaction_failed=1; + int transection_success=1; struct _Maat_cmd_inner_t* p=NULL,*n=NULL; redisContext* ctx=NULL; @@ -870,17 +1088,20 @@ int Maat_commit_command(Maat_feather_t feather) serial_rule_num+=calculate_serial_rule_num(p, &new_region_num, &new_group_num); p=p->next; } + _feather->server_time=redis_server_time(ctx); - data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_REGION %d",new_region_num); - assert(data_reply->type==REDIS_REPLY_INTEGER); - _feather->base_rgn_seq=data_reply->integer-new_region_num; - freeReplyObject(data_reply); - - data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_GROUP %d",new_group_num); - assert(data_reply->type==REDIS_REPLY_INTEGER); - _feather->base_rgn_seq=data_reply->integer-new_group_num; - freeReplyObject(data_reply); - + if(feather->AUTO_NUMBERING_ON==1) + { + data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_REGION %d",new_region_num); + assert(data_reply->type==REDIS_REPLY_INTEGER); + _feather->base_rgn_seq=data_reply->integer-new_region_num; + freeReplyObject(data_reply); + + data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_GROUP %d",new_group_num); + assert(data_reply->type==REDIS_REPLY_INTEGER); + _feather->base_grp_seq=data_reply->integer-new_group_num; + freeReplyObject(data_reply); + } s_rule=(struct serial_rule_t*)calloc(sizeof(struct serial_rule_t),serial_rule_num); for(i=0,p=_feather->cmd_qhead;i<_feather->cmd_q_cnt;i++) @@ -889,74 +1110,11 @@ int Maat_commit_command(Maat_feather_t feather) p=p->next; } assert(serial_rule_idx==serial_rule_num); - redis_transaction_failed=1; - while(redis_transaction_failed) + transection_success=0; + while(!transection_success) { - data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION"); - freeReplyObject(data_reply); - data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION"); - maat_redis_version=read_redis_integer(data_reply); - maat_redis_version++; - freeReplyObject(data_reply); - data_reply=_wrap_redisCommand(ctx,"MULTI"); - freeReplyObject(data_reply); - append_cmd_cnt=0; - for(i=0;icmd_q_cnt;i++) { n=p->next; - Maat_free_command((struct Maat_command_t* )p); + Maat_free_cmd((struct Maat_cmd_t* )p); p=n; } _feather->cmd_qhead=_feather->cmd_qtail=NULL; @@ -981,4 +1139,31 @@ error_out: free(s_rule); return ret; } +long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment) +{ + _Maat_feather_t* _feather=(_Maat_feather_t*)feather; + redisReply* data_reply=NULL; + long long result=0; + data_reply=_wrap_redisCommand(_feather->redis_write_ctx,"INCRBY %s %d", key, increment); + assert(data_reply->type==REDIS_REPLY_INTEGER); + result=data_reply->integer; + freeReplyObject(data_reply); + return result; +} +int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, int size) +{ + _Maat_feather_t* _feather=(_Maat_feather_t*)feather; + redisReply* data_reply=NULL; + unsigned int i=0; + data_reply=_wrap_redisCommand(_feather->redis_write_ctx,"ZRANGEBYSCORE %s %d %d" + ,rm_label_sset + ,label_id + ,label_id); + for(i=0;ielement&&ielement[i]->integer); + } + freeReplyObject(data_reply); + return i; +} diff --git a/src/entry/Maat_rule.cpp b/src/entry/Maat_rule.cpp index e979c93..96cbd0d 100644 --- a/src/entry/Maat_rule.cpp +++ b/src/entry/Maat_rule.cpp @@ -1010,12 +1010,12 @@ struct _Maat_scanner_t* create_maat_scanner(unsigned int version,_Maat_feather_t struct _Maat_scanner_t* scanner=NULL; scanner=(struct _Maat_scanner_t*)calloc(sizeof(struct _Maat_scanner_t),1); - //Function Maat_append_command will access compile_hash in user thread. + //Function Maat_cmd_append will access compile_hash in user thread. hargs.thread_safe=1; scanner->compile_hash=MESA_htable_create(&hargs, sizeof(hargs)); MESA_htable_print_crtl(scanner->compile_hash,0); - hargs.thread_safe=0; + hargs.thread_safe=1; hargs.data_free=EMPTY_FREE; scanner->group_hash=MESA_htable_create(&hargs, sizeof(hargs)); MESA_htable_print_crtl(scanner->group_hash,0); diff --git a/src/entry/Maat_rule_internal.h b/src/entry/Maat_rule_internal.h index 110f310..c8d55d2 100644 --- a/src/entry/Maat_rule_internal.h +++ b/src/entry/Maat_rule_internal.h @@ -376,6 +376,8 @@ struct _Maat_feather_t char redis_ip[MAX_TABLE_NAME_LEN]; int redis_port; + int redis_index; + int AUTO_NUMBERING_ON; struct timeval connect_timeout; redisContext *redis_read_ctx; redisContext *redis_write_ctx; // not thread safe. @@ -383,7 +385,7 @@ struct _Maat_feather_t int cmd_q_cnt; struct _Maat_cmd_inner_t* cmd_qhead, *cmd_qtail; pthread_mutex_t redis_write_lock; //protect redis_write_ctx - long long base_rgn_seq,base_grp_seq; + long long base_rgn_seq,base_grp_seq,server_time; //for stat>>>> screen_stat_handle_t stat_handle; int total_stat_id; diff --git a/test/maat_test.cpp b/test/maat_test.cpp index a6713ef..d1c26b9 100644 --- a/test/maat_test.cpp +++ b/test/maat_test.cpp @@ -466,7 +466,7 @@ int test_command(Maat_feather_t feather) int table_id; scan_status_t mid=NULL; - struct Maat_command_t* cmd=NULL; + struct Maat_cmd_t* cmd=NULL; struct Maat_rule_t rule; struct Maat_rule_t result; struct Maat_region_t region; @@ -474,8 +474,8 @@ int test_command(Maat_feather_t feather) memset(&rule,0,sizeof(rule)); rule.config_id=201; strcpy(rule.service_defined,"maat_command"); - //MUST acqire by function, because Maat_command_t has some hidden members. - cmd=Maat_create_comand(&rule, group_num); + //MUST acqire by function, because Maat_cmd_t has some hidden members. + cmd=Maat_create_cmd(&rule, group_num); memset(®ion,0,sizeof(region)); region.region_type=REGION_EXPR; region.table_name=table_name; @@ -484,18 +484,18 @@ int test_command(Maat_feather_t feather) region.expr_rule.expr_type=EXPR_TYPE_AND; region.expr_rule.match_method=MATCH_METHOD_SUB; region.expr_rule.hex_bin=UNCASE_PLAIN; - Maat_cmd_add_region(cmd, 0, ®ion); + Maat_add_region2cmd(cmd, 0, ®ion); //use pipeline model. - ret=Maat_append_command(feather, cmd, MAAT_OP_ADD); + ret=Maat_cmd_append(feather, cmd, MAAT_OP_ADD); if(ret<0) { printf("Add Maat command %d failed.\n",rule.config_id); - Maat_free_command(cmd); + Maat_free_cmd(cmd); return 0; } //cmd has been saved in feather, so free before commit is allowed. - Maat_free_command(cmd); - ret=Maat_commit_command(feather); + Maat_free_cmd(cmd); + ret=Maat_cmd_commit(feather); if(ret<0) { printf("Commit Maat command %d failed.\n",rule.config_id); @@ -519,15 +519,15 @@ int test_command(Maat_feather_t feather) memset(&rule,0,sizeof(rule)); rule.config_id=201; - cmd=Maat_create_comand(&rule, 0); - ret=Maat_command(feather, cmd, MAAT_OP_DEL); + cmd=Maat_create_cmd(&rule, 0); + ret=Maat_cmd(feather, cmd, MAAT_OP_DEL); if(ret<0) { printf("Delete Maat command %d failed.\n",rule.config_id); - Maat_free_command(cmd); + Maat_free_cmd(cmd); return 0; } - Maat_free_command(cmd); + Maat_free_cmd(cmd); sleep(1);//waiting for commands go into effect ret=Maat_full_scan_string(feather, table_id,CHARSET_GBK, scan_data, strlen(scan_data), &result,NULL, 1, @@ -559,6 +559,7 @@ int main(int argc,char* argv[]) const char* redis_ip="127.0.0.1"; unsigned short redis_port=6379; int scan_detail=0; + int using_redis=0; scan_status_t mid=NULL; int wait_second=400; void *logger=MESA_create_runtime_log_handle(log_file,0); @@ -577,6 +578,7 @@ int main(int argc,char* argv[]) { Maat_set_feather_opt(feather, MAAT_OPT_REDIS_IP, redis_ip, strlen(redis_ip)+1); Maat_set_feather_opt(feather, MAAT_OPT_REDIS_PORT, &redis_port, sizeof(redis_port)); + using_redis=1; } else { @@ -629,9 +631,10 @@ int main(int argc,char* argv[]) test_table_conjunction(feather, "HTTP_URL", "HTTP_HOST", &mid); Maat_clean_status(&mid); - - test_command(feather); - + if(1==using_redis) + { + test_command(feather); + } sleep(wait_second); Maat_burn_feather(feather);