diff --git a/inc/Maat_command.h b/inc/Maat_command.h index 271ab85..43f226b 100644 --- a/inc/Maat_command.h +++ b/inc/Maat_command.h @@ -104,7 +104,7 @@ struct Maat_region_t struct Maat_group_t { int region_num; - int group_id; //If MAAT_OPT_CMD_AUTO_NUMBERING==1, maat will assigned one. Or users must appoint a unique number. + int group_id; //If MAAT_OPT_CMD_AUTO_NUMBERING==1, maat will assigned one. Or users must assign a unique number. struct Maat_region_t *regions; }; struct Maat_cmd_t @@ -113,9 +113,17 @@ struct Maat_cmd_t int group_num; // for MAAT_OP_DEL, set to 0. int expire_after; //expired after $timeout$ seconds, set to 0 for never timeout. int label_id; //>0, for Maat_cmd_select - struct Maat_group_t* groups;// for MAAT_OP_DEL, set to NULL. + struct Maat_group_t* groups;// Add regions with Maat_add_region2cmd }; -struct Maat_cmd_t* Maat_create_cmd(const struct Maat_rule_t* rule, int group_num, const char* label); +struct Maat_line_t +{ + const char* table_name; + const char* table_line; + int rule_id; + int label_id; + int expire_after; //expired after $timeout$ seconds, set to 0 for never timeout. +}; +struct Maat_cmd_t* Maat_create_cmd(const struct Maat_rule_t* rule, int group_num); void Maat_add_region2cmd(struct Maat_cmd_t* cmd,int which_group,const struct Maat_region_t* region); void Maat_free_cmd(struct Maat_cmd_t* cmd); @@ -130,10 +138,10 @@ int Maat_cmd_commit(Maat_feather_t feather); int Maat_cmd_set_group(Maat_feather_t feather, int group_id, const struct Maat_region_t* region, enum MAAT_OPERATION op); -int Maat_cmd_set_line(Maat_feather_t feather, const char* table_name,int rule_id, const char* line, int timeout, enum MAAT_OPERATION op); +int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_line_t* line_rule, enum MAAT_OPERATION op); //Return the value of key after the increment. long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment); -int Maat_cmd_select(Maat_feather_t feather, int label, int * output_ids, int size); +int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, unsigned int size); #endif diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index b1aacea..63b01f7 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -49,6 +49,20 @@ redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...) va_end(ap); return (redisReply *)reply; } +int connect_redis_for_write(_Maat_feather_t * feather) +{ + int ret=0; + assert(feather->redis_write_ctx==NULL); + feather->redis_write_ctx=redisConnectWithTimeout(feather->redis_ip, feather->redis_port,feather->connect_timeout); + if(feather->redis_write_ctx==NULL) + { + MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module + ,"Redis connect %s:%d for write failed." + ,feather->redis_ip,feather->redis_port); + ret=-1; + } + return ret; +} long long read_redis_integer(const redisReply* reply) { switch(reply->type) @@ -74,8 +88,8 @@ long long redis_server_time(redisContext* ctx) long long server_time=0; redisReply* data_reply=NULL; data_reply=_wrap_redisCommand(ctx,"TIME"); - assert(data_reply->type==REDIS_REPLY_INTEGER); - server_time=data_reply->integer; + assert(data_reply->type==REDIS_REPLY_ARRAY); + server_time=atoll(data_reply->element[0]->str); freeReplyObject(data_reply); return server_time; } @@ -167,7 +181,7 @@ void invalidate_line(char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq) } int del_rule_from_redis(redisContext* ctx, struct serial_rule_t* s_rule, long long new_version) { - int append_cmd_cnt=0£» + int append_cmd_cnt=0; redisAppendCommand(ctx,"RENAME %s:%s,%d %s:%s,%d" ,rm_key_prefix[MAAT_OP_ADD] ,s_rule->table_name @@ -543,11 +557,11 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st } snprintf(line,sizeof(line),"%d\t%d\t1",p_group->group_id ,p_m_rule->config_id); - set_serial_rule(list+rule_num,MAAT_OP_ADD,p_group->group_id,0,feather->group_tn,line); + set_serial_rule(list+rule_num,MAAT_OP_ADD,p_group->group_id,0,feather->group_tn,line,timeout); } else { - set_serial_rule(list+rule_num,MAAT_OP_DEL,p_group->group_id,0,feather->group_tn,NULL); + set_serial_rule(list+rule_num,MAAT_OP_DEL,p_group->group_id,0,feather->group_tn,NULL,0); } rule_num++; if(p_group->regions==NULL)//group reuse. @@ -566,12 +580,12 @@ int build_serial_rule(_Maat_feather_t *feather,struct _Maat_cmd_inner_t* _cmd,st } serialize_region(p_region, p_group->group_id, line, sizeof(line)); set_serial_rule(list+rule_num,MAAT_OP_ADD - ,p_region->region_id,0,p_region->table_name,line); + ,p_region->region_id,0,p_region->table_name,line,0); } else { set_serial_rule(list+rule_num,MAAT_OP_DEL - ,p_region->region_id,0,p_region->table_name,NULL); + ,p_region->region_id,0,p_region->table_name,NULL,0); } rule_num++; @@ -591,6 +605,74 @@ int mr_transaction_success(redisReply* data_reply) return 1; } } +int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num) +{ + int append_cmd_cnt=0,i=0; + long long maat_redis_version=0; + redisReply* data_reply=NULL; + int redis_transaction_success=1; + data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION"); + freeReplyObject(data_reply); + data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION"); + maat_redis_version=read_redis_integer(data_reply); + maat_redis_version++; + freeReplyObject(data_reply); + data_reply=_wrap_redisCommand(ctx,"MULTI"); + freeReplyObject(data_reply); + append_cmd_cnt=0; + for(i=0;i0) + { + redisAppendCommand(ctx,"ZADD %s NX %lld %s,%d",rm_expire_sset + ,s_rule[i].timeout + ,s_rule[i].table_name + ,s_rule[i].rule_id); + append_cmd_cnt++; + } + if(s_rule[i].label_id>0) + { + redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset + ,s_rule[i].label_id + ,s_rule[i].rule_id); + append_cmd_cnt++; + } + } + else + { + append_cmd_cnt+=del_rule_from_redis(ctx,s_rule+i,maat_redis_version); + } + + } + redisAppendCommand(ctx,"INCRBY MAAT_VERSION 1"); + append_cmd_cnt++; + redisAppendCommand(ctx,"EXEC"); + append_cmd_cnt++; + redis_transaction_success=1; + for(i=0;ielement[0].integer; + server_time=data_reply->element[0]->integer; freeReplyObject(data_reply); data_reply=_wrap_redisCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",rm_expire_sset,server_time); - if(data_reply->type!=REDIS_REPLY_ARRAY) + if(data_reply->type!=REDIS_REPLY_ARRAY||data_reply->elements==0) { freeReplyObject(data_reply); return; } s_rule_num=data_reply->elements; - s_rule=(struct serial_rule_t*)calloc(sizeof(struct serial_rule_t)*s_rule_num); + s_rule=(struct serial_rule_t*)calloc(sizeof(struct serial_rule_t),s_rule_num); for(i=0;ielement[i].str,"%[^,],%d",s_rule[i].table_name,&(s_rule.rule_id)); + ret=sscanf(data_reply->element[i]->str,"%[^,],%d",s_rule[i].table_name,&(s_rule[i].rule_id)); assert(ret==2); } + freeReplyObject(data_reply); is_success=exec_serial_rule(ctx,s_rule, s_rule_num); if(is_success==1) @@ -743,74 +826,7 @@ void redis_monitor_traverse(unsigned int version,redisContext *c, rule_list=NULL; return; } -int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num) -{ - int append_cmd_cnt=0,i=0; - long long maat_redis_version=0; - redisReply* data_reply=NULL; - int redis_transaction_success=1; - data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION"); - freeReplyObject(data_reply); - data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION"); - maat_redis_version=read_redis_integer(data_reply); - maat_redis_version++; - freeReplyObject(data_reply); - data_reply=_wrap_redisCommand(ctx,"MULTI"); - freeReplyObject(data_reply); - append_cmd_cnt=0; - for(i=0;i0) - { - redisAppendCommand(ctx,"ZADD %s NX %lld %s,%d",rm_expire_sset - ,s_rule[i].timeout - ,s_rule[i].table_name - ,s_rule[i].rule_id); - append_cmd_cnt++; - } - if(s_rule[i].label_id>0) - { - redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset - ,s_rule[i].label_id - ,s_rule[i].rule_id); - append_cmd_cnt++; - } - } - else - { - append_cmd_cnt+=del_rule_from_redis(ctx,s_rule+i,maat_redis_version); - } - } - redisAppendCommand(ctx,"INCRBY MAAT_VERSION 1"); - append_cmd_cnt++; - redisAppendCommand(ctx,"EXEC"); - append_cmd_cnt++; - redis_transaction_success=1; - for(i=0;iAUTO_NUMBERING_ON==1) - { - return -1; - } - ret=map_str2int(_feather->map_tablename2id, table_name, &table_id); + ret=map_str2int(_feather->map_tablename2id, line_rule->table_name, &table_id); if(ret<0) { - MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module + MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module ,"Command set line id %d failed: unknown table %s." - ,rule_id - ,table_name); + , line_rule->rule_id + , line_rule->table_name); return -1; } - if(TABLE_TYPE_PLUGIN!=feather->p_table_info[table_id]->table_type) + if(TABLE_TYPE_PLUGIN!=_feather->p_table_info[table_id]->table_type) { - MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module + MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module ,"Command set line id %d failed: table %s is not a plugin table." - ,rule_id - ,table_name); + , line_rule->rule_id + , line_rule->table_name); return -1; } - if(timeout>0) + if( line_rule->expire_after>0) { absolute_expire_time=redis_server_time(_feather->redis_write_ctx); - absolute_expire_time+=timeout; + absolute_expire_time+=line_rule->expire_after; } - set_serial_rule(&s_rule, op, rule_id,table_name,line, absolute_expire_time); + set_serial_rule(&s_rule, op,line_rule->rule_id,line_rule->label_id,line_rule->table_name,line_rule->table_line, absolute_expire_time); ret=0; while(!ret) { @@ -1019,6 +1031,7 @@ int Maat_cmd_append(Maat_feather_t feather,struct Maat_cmd_t* cmd,enum MAAT_OPER int ret=0; _cmd->op=op; assert(op==MAAT_OP_DEL||op==MAAT_OP_ADD); + assert(_cmd->next==NULL); if(op==MAAT_OP_DEL) { ret=reconstruct_cmd(_feather, _cmd); @@ -1051,8 +1064,7 @@ int Maat_cmd_commit(Maat_feather_t feather) { _Maat_feather_t* _feather=(_Maat_feather_t*)feather; - int ret=0,i=0,retry=0,append_cmd_cnt=0; - long long maat_redis_version=0; + int ret=0,i=0,retry=0; int new_region_num=0,new_group_num=0; int serial_rule_num=0,serial_rule_idx=0; int transection_success=1; @@ -1072,13 +1084,9 @@ int Maat_cmd_commit(Maat_feather_t feather) } if(_feather->redis_write_ctx==NULL) { - _feather->redis_write_ctx=redisConnectWithTimeout(_feather->redis_ip, _feather->redis_port,_feather->connect_timeout); - if(_feather->redis_write_ctx==NULL) + ret=connect_redis_for_write(_feather); + if(ret!=0) { - MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module - ,"Redis connect %s:%d failed when appending command." - ,_feather->redis_ip,_feather->redis_port); - ret=-1; goto error_out; } } @@ -1090,7 +1098,7 @@ int Maat_cmd_commit(Maat_feather_t feather) } _feather->server_time=redis_server_time(ctx); - if(feather->AUTO_NUMBERING_ON==1) + if(_feather->AUTO_NUMBERING_ON==1) { data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_REGION %d",new_region_num); assert(data_reply->type==REDIS_REPLY_INTEGER); @@ -1144,24 +1152,42 @@ long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment) _Maat_feather_t* _feather=(_Maat_feather_t*)feather; redisReply* data_reply=NULL; long long result=0; + int ret=0; + if(_feather->redis_write_ctx==NULL) + { + ret=connect_redis_for_write(_feather); + if(ret!=0) + { + return -1; + } + } data_reply=_wrap_redisCommand(_feather->redis_write_ctx,"INCRBY %s %d", key, increment); assert(data_reply->type==REDIS_REPLY_INTEGER); result=data_reply->integer; freeReplyObject(data_reply); return result; } -int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, int size) +int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, unsigned int size) { _Maat_feather_t* _feather=(_Maat_feather_t*)feather; redisReply* data_reply=NULL; unsigned int i=0; + int ret=0; + if(_feather->redis_write_ctx==NULL) + { + ret=connect_redis_for_write(_feather); + if(ret!=0) + { + return -1; + } + } data_reply=_wrap_redisCommand(_feather->redis_write_ctx,"ZRANGEBYSCORE %s %d %d" ,rm_label_sset ,label_id ,label_id); - for(i=0;ielement&&ielements&&ielement[i]->integer); + output_ids[i]=atoi(data_reply->element[i]->str); } freeReplyObject(data_reply); return i; diff --git a/test/maat_json.json b/test/maat_json.json index 3384247..2bdc887 100644 --- a/test/maat_json.json +++ b/test/maat_json.json @@ -384,9 +384,9 @@ { "table_name": "QD_ENTRY_INFO", "table_content": [ - "1\t192.168.0.1\t101", - "2\t192.168.0.2\t101", - "3\t192.168.1.1\t102" + "1\t192.168.0.1\t101\t1", + "2\t192.168.0.2\t101\t1", + "3\t192.168.1.1\t102\t1" ] }, { diff --git a/test/maat_test.cpp b/test/maat_test.cpp index d1c26b9..0cf2a4c 100644 --- a/test/maat_test.cpp +++ b/test/maat_test.cpp @@ -28,12 +28,20 @@ void Maat_read_entry_cb(int table_id,const char* table_line,void* u_para) char ip_str[16]={0}; int entry_id=-1,seq=-1; unsigned int ip_uint=0; + int is_valid=0; unsigned int local_ip_nr=16820416;//192.168.0.1 - sscanf(table_line,"%d\t%s\t%d",&seq,ip_str,&entry_id); + sscanf(table_line,"%d\t%s\t%d\t%d",&seq,ip_str,&entry_id,&is_valid); inet_pton(AF_INET,ip_str,&ip_uint); if(local_ip_nr==ip_uint) { - printf("Load entry id %d SUCCESS.\n",entry_id); + if(is_valid==1) + { + printf("Load entry id %d success.\n",entry_id); + } + else + { + printf("Offload entry id %d success.\n",entry_id); + } } return; } @@ -459,28 +467,43 @@ int test_table_conjunction(Maat_feather_t feather,const char* table_name,const c } return 0; } -int test_command(Maat_feather_t feather) +void test_set_cmd_line(Maat_feather_t feather) +{ + struct Maat_line_t line_rule; + int ret=0; + memset(&line_rule,0,sizeof(line_rule)); + line_rule.label_id=0; + line_rule.rule_id=(int)Maat_cmd_incrby(feather,"TEST_PLUG_SEQ", 1); + line_rule.table_name="QD_ENTRY_INFO"; + line_rule.table_line="1\t192.168.0.1\t101\t1"; + line_rule.expire_after=0; + ret=Maat_cmd_set_line(feather, &line_rule, MAAT_OP_ADD); + assert(ret==0); + sleep(1); + ret=Maat_cmd_set_line(feather, &line_rule, MAAT_OP_DEL); + assert(ret==0); + return; +} +int test_add_command(Maat_feather_t feather,const char* region_table,int config_id, int timeout,int label_id, const char* keywords) { - const char* scan_data="Hiredis is a minimalistic C client library for the Redis database.\r\n"; - const char* table_name="HTTP_URL"; - int table_id; - scan_status_t mid=NULL; - struct Maat_cmd_t* cmd=NULL; struct Maat_rule_t rule; - struct Maat_rule_t result; + struct Maat_region_t region; int group_num=1,ret=0; memset(&rule,0,sizeof(rule)); - rule.config_id=201; + rule.config_id=config_id; + strcpy(rule.service_defined,"maat_command"); //MUST acqire by function, because Maat_cmd_t has some hidden members. cmd=Maat_create_cmd(&rule, group_num); + cmd->expire_after=timeout; + cmd->label_id=label_id; memset(®ion,0,sizeof(region)); region.region_type=REGION_EXPR; - region.table_name=table_name; + region.table_name=region_table; region.expr_rule.district=NULL; - region.expr_rule.keywords="Hiredis&C\\bClient"; + region.expr_rule.keywords=keywords; region.expr_rule.expr_type=EXPR_TYPE_AND; region.expr_rule.match_method=MATCH_METHOD_SUB; region.expr_rule.hex_bin=UNCASE_PLAIN; @@ -493,41 +516,71 @@ int test_command(Maat_feather_t feather) Maat_free_cmd(cmd); return 0; } - //cmd has been saved in feather, so free before commit is allowed. + //cmd has been saved in feather, so free cmd before commit is allowed. Maat_free_cmd(cmd); ret=Maat_cmd_commit(feather); if(ret<0) { printf("Commit Maat command %d failed.\n",rule.config_id); - return 0; } + return 0; + +} +int test_del_command(Maat_feather_t feather,int config_id) +{ + struct Maat_cmd_t* cmd=NULL; + struct Maat_rule_t rule; + int ret=0; + memset(&rule,0,sizeof(rule)); + rule.config_id=config_id; + cmd=Maat_create_cmd(&rule, 0); + ret=Maat_cmd(feather, cmd, MAAT_OP_DEL); + if(ret<0) + { + printf("Delete Maat command %d failed.\n",rule.config_id); + } + Maat_free_cmd(cmd); + return 0; +} +void test_command(Maat_feather_t feather) +{ + const char* scan_data="Hiredis is a minimalistic C client library for the Redis database.\r\n"; + const char* table_name="HTTP_URL"; + const char* keywords="Hiredis&C\\bClient"; + scan_status_t mid=NULL; + int config_id=-1, table_id=0, ret=0; + int output_ids[4]; + int output_id_cnt=0; + struct Maat_rule_t result; + int timeout=0;//seconds + int label_id=5210; + config_id=(int)Maat_cmd_incrby(feather, "TEST_SEQ", 1); + test_add_command(feather,table_name,config_id, 0, label_id, keywords); sleep(1);//waiting for commands go into effect table_id=Maat_table_register(feather,table_name); ret=Maat_full_scan_string(feather, table_id,CHARSET_GBK, scan_data, strlen(scan_data), &result,NULL, 1, &mid, 0); - if(ret>0&&result.config_id==rule.config_id) + if(ret>0&&result.config_id==config_id) { - printf("Test Maat add command Success %s\n",print_maat_result(&result,ret)); + printf("Test Maat add command success %s\n",print_maat_result(&result,ret)); } else { printf("Test Maat add command failed.\n"); } Maat_clean_status(&mid); - - memset(&rule,0,sizeof(rule)); - rule.config_id=201; - cmd=Maat_create_cmd(&rule, 0); - ret=Maat_cmd(feather, cmd, MAAT_OP_DEL); - if(ret<0) + output_id_cnt=Maat_cmd_select(feather,label_id, output_ids, 4); + if(output_id_cnt==1&&output_ids[0]==config_id) { - printf("Delete Maat command %d failed.\n",rule.config_id); - Maat_free_cmd(cmd); - return 0; + printf("Test Maat select command success.\n"); } - Maat_free_cmd(cmd); + else + { + printf("Test Maat select command label %d failed.\n",label_id); + } + test_del_command(feather, config_id); sleep(1);//waiting for commands go into effect ret=Maat_full_scan_string(feather, table_id,CHARSET_GBK, scan_data, strlen(scan_data), &result,NULL, 1, @@ -538,11 +591,24 @@ int test_command(Maat_feather_t feather) } else { - printf("Test Maat delete command Success.\n"); + printf("Test Maat delete command success.\n"); } Maat_clean_status(&mid); - return 0; + timeout=1; + test_add_command(feather,table_name,config_id, timeout, label_id, keywords); + sleep(timeout+1); + ret=Maat_full_scan_string(feather, table_id,CHARSET_GBK, scan_data, strlen(scan_data), + &result,NULL, 1, + &mid, 0); + if(ret>0&&result.config_id==config_id)//should not hit + { + printf("Test Maat command timeout failed."); + } + else + { + printf("Test Maat command timeout success.\n"); + } } int main(int argc,char* argv[]) { @@ -634,6 +700,7 @@ int main(int argc,char* argv[]) if(1==using_redis) { test_command(feather); + test_set_cmd_line(feather); } sleep(wait_second);