diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index ba9a44c..65d9fe9 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -16,7 +16,7 @@ const char* rm_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"}; const char* rm_status_sset="MAAT_UPDATE_STATUS"; const char* rm_expire_sset="MAAT_EXPIRE_TIMER"; const char* rm_label_sset="MAAT_LABEL_INDEX"; -const char* rm_version_sset="/AAT_VERSION_TIMER"; +const char* rm_version_sset="MAAT_VERSION_TIMER"; const static int MAAT_REDIS_SYNC_TIME=30*60; struct serial_rule_t //rm= Redis Maat @@ -370,7 +370,7 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** if(reply->type==REDIS_REPLY_NIL||reply->type==REDIS_REPLY_ERROR) { MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"GET MAAT_VERSION failed, maybe Redis is busy."); - return 0; + return -1; } } else @@ -380,7 +380,7 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, "GET MAAT_VERSION failed %s. Reconnecting...",err_buff); _wrap_redisReconnect(c,logger); - return 0; + return -1; } version_in_redis=read_redis_integer(reply); assert(version_in_redis>=version); @@ -405,14 +405,14 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** __redis_strerror_r(errno,err_buff,sizeof(err_buff)); MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, "GET %s failed %s.",rm_status_sset,err_buff); - return 0; + return -1; } assert(reply->type==REDIS_REPLY_ARRAY); if(reply->elements==0) { MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%d %d",rm_status_sset,version,version_in_redis); freeReplyObject(reply); - return 0; + return -1; } tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str); if(tmp_reply->type!=REDIS_REPLY_STRING) @@ -732,7 +732,27 @@ int mr_transaction_success(redisReply* data_reply) return 1; } } +int _exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time); int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time) +{ + int max_redis_batch=8*1024,batch_cnt=0; + int success_cnt=0,ret=0; + while(success_cntlogger; @@ -968,27 +988,30 @@ void redis_monitor_traverse(unsigned int version,redisContext *c, cleanup_update_status(feather->redis_read_ctx, logger); } rule_num=get_rm_key_list(version, c, &rule_list, logger,&new_version, &update_type); - if(rule_num==0) + if(rule_num<0||(rule_num==0&&update_type==CM_UPDATE_TYPE_INC))//error or nothing changed { return; } - ret=get_rm_value(c,rule_list,rule_num, logger); - if(ret<0) + if(rule_num>0) { - MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update."); - goto clean_up; + ret=get_rm_value(c,rule_list,rule_num, logger); + if(ret<0) + { + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update."); + goto clean_up; + } } start(new_version,update_type,u_para); for(i=0;imap_tablename2id,rule_list[i].table_name,&table_id); + if(ret<0)//Unrecognized table. + { + continue; + } + table_type=feather->p_table_info[table_id]->table_type; if(rule_list[i].op==MAAT_OP_DEL) { - ret=map_str2int(feather->map_tablename2id,rule_list[i].table_name,&table_id); - if(ret<0)//Unrecognized table. - { - continue; - } - table_type=feather->p_table_info[table_id]->table_type; invalidate_line(rule_list[i].table_line,table_type,feather->p_table_info[table_id]->valid_flag_column); } update(rule_list[i].table_name,rule_list[i].table_line,u_para); @@ -1134,7 +1157,7 @@ int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_line_t* line_rule } set_serial_rule(&s_rule, op,line_rule->rule_id,line_rule->label_id,line_rule->table_name,line_rule->table_line, absolute_expire_time); ret=0; - while(!ret) + while(ret==0) { ret=exec_serial_rule(_feather->redis_write_ctx,&s_rule, 1,server_time); retry++; @@ -1303,10 +1326,10 @@ int Maat_cmd_commit(Maat_feather_t feather) } assert(serial_rule_idx==serial_rule_num); transection_success=0; - while(!transection_success) + while(transection_successserver_time); - if(transection_success!=1) + transection_success+=exec_serial_rule(ctx, s_rule+transection_success,serial_rule_num-transection_success,_feather->server_time); + if(transection_successtag!=NULL); if(_handle->input_format == GIE_INPUT_FORMAT_SFH) { grab_ret = sfh_grab_key_set(digests[i]->sfh,digests[i]->sfh_length,i,gram_value,&gram_cnt,to_process_list); @@ -720,6 +721,7 @@ void copy_idtable_item_iterate(const uchar * key, uint size, void * data, void * struct id_table_data * id_data = (struct id_table_data *)data; struct htable_handle * htable_para = (struct htable_handle *)user; struct id_table_data * id_data_copy = (struct id_table_data *)calloc(1, sizeof(struct id_table_data)); + assert(id_data->tag!=NULL); memcpy(id_data_copy,id_data,sizeof(struct id_table_data)); id_data_copy->sfh = (char *)calloc(id_data_copy->sfh_length,sizeof(char)); memcpy(id_data_copy->sfh,id_data->sfh,id_data_copy->sfh_length);