diff --git a/inc/Maat_rule.h b/inc/Maat_rule.h index b4d3ca9..e52496d 100644 --- a/inc/Maat_rule.h +++ b/inc/Maat_rule.h @@ -150,9 +150,10 @@ enum MAAT_INIT_OPT MAAT_OPT_REDIS_IP, //VALUE is a const char*, MUST end with '\0', SIZE= strlen(string+'\0')+1. No DEFAULT. MAAT_OPT_REDIS_PORT, //VALUE is a unsigned short or a signed int, host order, SIZE= sizeof(unsigned short) or sizeof(int). No DEFAULT. MAAT_OPT_REDIS_INDEX, //VALUE is interger *, 0~15, SIZE=sizeof(int). DEFAULT: 0. - MAAT_OPT_CMD_AUTO_NUMBERING, //VALUE is interger *, 1 or 0, SIZE=sizeof(int). DEFAULT: 1. - MAAT_OPT_DEFERRED_LOAD, //VALUE is NULL,SIZE is 0. Default: Deffered initialization OFF. - MAAT_OPT_CUMULATIVE_UPDATE_OFF //VALUE is NULL,SIZE is 0. Default: CUMMULATIVE UPDATE ON. + MAAT_OPT_CMD_AUTO_NUMBERING, //VALUE is a interger *, 1 or 0, SIZE=sizeof(int). DEFAULT: 1. + MAAT_OPT_DEFERRED_LOAD, //VALUE is NULL,SIZE is 0. Default: Deffered initialization OFF. + MAAT_OPT_CUMULATIVE_UPDATE_OFF, //VALUE is NULL,SIZE is 0. Default: CUMMULATIVE UPDATE ON. + MAAT_OPT_LOAD_SPECIFIC_VERSION //VALUE is a long long, SIZE=sizeof(long long). Default: Load the Latest. Only valid in redis mode, and maybe failed for too old. }; //return -1 if failed, return 0 on success; int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const void* value,int size); diff --git a/src/entry/Maat_api.cpp b/src/entry/Maat_api.cpp index 0572763..dec831f 100644 --- a/src/entry/Maat_api.cpp +++ b/src/entry/Maat_api.cpp @@ -621,6 +621,10 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo break; case MAAT_OPT_CUMULATIVE_UPDATE_OFF: _feather->cumulative_update_off=1; + break; + case MAAT_OPT_LOAD_SPECIFIC_VERSION: + _feather->load_from_specific_version=*((long long*)value); + break; default: return -1; } diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index c4bd47c..c695bb9 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -325,16 +325,162 @@ int _wrap_redisReconnect(redisContext* c, void*logger) return -1; } } -int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** list,void* logger, long long* new_version,int *update_type, int cumulative_off) +int get_inc_key_list(long long instance_version, long long target_version, redisContext *c, struct serial_rule_t** list,void* logger) { - redisReply* reply=NULL,*sub_reply=NULL,*tmp_reply=NULL; + redisReply* reply=NULL,*tmp_reply=NULL; + char err_buff[256], op_str[4]; + int rule_num=0,ret=0; + unsigned int i=0; + long long nearest_rule_version; + struct serial_rule_t *s_rule=NULL; + + //Returns all the elements in the sorted set at key with a score that instance_version < score <= redis_version. + //The elements are considered to be ordered from low to high scores(instance_version). + reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",rm_status_sset,instance_version,target_version); + + if(reply==NULL) + { + __redis_strerror_r(errno,err_buff,sizeof(err_buff)); + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "GET %s failed %s.",rm_status_sset,err_buff); + return -1; + } + assert(reply->type==REDIS_REPLY_ARRAY); + rule_num=reply->elements; + if(reply->elements==0) + { + return 0; + } + + tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str); + if(tmp_reply->type!=REDIS_REPLY_STRING) + { + MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, + "ZSCORE %s %s failed Version: %lld->%lld",rm_status_sset,reply->element[0]->str,instance_version, target_version); + freeReplyObject(tmp_reply); + freeReplyObject(reply); + return -1; + } + nearest_rule_version=read_redis_integer(tmp_reply); + freeReplyObject(tmp_reply); + tmp_reply=NULL; + if(nearest_rule_version!=instance_version+1) + { + MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, + "Noncontinuous VERSION Redis: %lld MAAT: %lld.",nearest_rule_version,instance_version); + + return -1; + } + s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t)); + for(i=0;ielements;i++) + { + assert(reply->element[i]->type==REDIS_REPLY_STRING); + ret=sscanf(reply->element[i]->str,"%[^,],%[^,],%d",op_str,s_rule[i].table_name,&(s_rule[i].rule_id)); + assert(ret==3); + if(strncmp(op_str,"ADD",strlen("ADD"))==0) + { + s_rule[i].op=MAAT_OP_ADD; + } + else if(strncmp(op_str,"DEL",strlen("DEL"))==0) + { + s_rule[i].op=MAAT_OP_DEL; + } + else + { + assert(0); + } + } + rule_num=reply->elements; + *list=s_rule; + freeReplyObject(reply); + return rule_num; +} +struct s_rule_array_t +{ + int cnt; + int size; + struct serial_rule_t* array; +}; +void save_serial_rule_cb(const uchar * key, uint size, void * data, void * user) +{ + struct s_rule_array_t* array=(struct s_rule_array_t*)user; + int i=array->cnt; + memcpy(&(array->array[i]),data,sizeof(struct serial_rule_t)); + array->array[i].op=MAAT_OP_ADD; + return; +} +int recovery_history_version(const struct serial_rule_t* current, int current_num, const struct serial_rule_t* changed, int changed_num, struct serial_rule_t** history_result) +{ + int i=0,ret=0; + unsigned int history_num=0; + int hash_slot_size=1; + MESA_htable_handle htable=NULL; + MESA_htable_create_args_t hargs; + struct s_rule_array_t tmp_array; + char hkey[256+20]; + int tmp=current_num+changed_num; + for(;tmp>0;tmp=tmp/2) + { + hash_slot_size*=2; + } + + memset(&hargs,0,sizeof(hargs)); + hargs.thread_safe=0; + hargs.hash_slot_size = hash_slot_size; + hargs.max_elem_num = 0; + hargs.eliminate_type = HASH_ELIMINATE_ALGO_FIFO; + hargs.expire_time = 0; + hargs.key_comp = NULL; + hargs.key2index = NULL; + hargs.recursive = 1; + hargs.data_free = NULL;//data is an reference, no need to free. + hargs.data_expire_with_condition = NULL; + htable=MESA_htable_create(&hargs, sizeof(hargs)); + MESA_htable_print_crtl(htable, 0); + + for(i=0;i0); + } + + for(i=changed_num-1;i>=0;i--) + { + snprintf(hkey,sizeof(hkey),"%d,%s",changed[i].rule_id,changed[i].table_name); + if(changed[i].op==MAAT_OP_ADD)//newly added rule is need to delete from current, so that history version can be recovered. + { + ret=MESA_htable_del(htable, (uchar*)hkey, strlen(hkey), NULL); + } + else + { + ret=MESA_htable_add(htable, (uchar*)hkey, strlen(hkey),changed+i); + } + if(ret<0)//failed + { + goto error_out; + } + } + history_num=MESA_htable_get_elem_num(htable); + tmp_array.cnt=0; + tmp_array.size=history_num; + tmp_array.array=(struct serial_rule_t*)calloc(history_num,sizeof(struct serial_rule_t)); + MESA_htable_iterate(htable, save_serial_rule_cb, &tmp_array); + *history_result=tmp_array.array; + ret=history_num; +error_out: + MESA_htable_destroy(htable, NULL); + return ret; +} +int get_rm_key_list(redisContext *c, long long instance_version, long long desired_version, long long* new_version, struct serial_rule_t** list,int *update_type, void* logger, int cumulative_off) +{ + redisReply* reply=NULL,*sub_reply=NULL; char err_buff[256]; - char op_str[4]; - long long version_in_redis=0,target_version=0,nearest_rule_version=0; - int rule_num=0; + long long redis_version=0,target_version=0; + int rule_num=0, changed_rule_num=0; int ret=0; unsigned int i=0,full_idx =0,append_cmd_cnt=0; - struct serial_rule_t *s_rule=NULL; + struct serial_rule_t *s_rule_array=NULL, *changed_rule_array=NULL, *history_rule_array=NULL; reply=(redisReply*)redisCommand(c, "GET MAAT_VERSION"); if(reply!=NULL) @@ -357,110 +503,64 @@ int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** lis _wrap_redisReconnect(c,logger); return -1; } - version_in_redis=read_redis_integer(reply); + redis_version=read_redis_integer(reply); freeReplyObject(reply); - if(version_in_redis==version) + if(redis_version==instance_version) { return 0; } - if(version==0) + if(instance_version==0||desired_version!=0) { goto FULL_UPDATE; } - if(version_in_redis Redis: %lld.",version,version_in_redis); + "VERSION roll back MAAT: %lld -> Redis: %lld.",instance_version,redis_version); goto FULL_UPDATE; } - if(version_in_redis>version&&cumulative_off==1) + if(redis_version>instance_version&&cumulative_off==1) { - target_version=version; + target_version=instance_version; } else { - target_version=version_in_redis-1; + target_version=redis_version-1; } do{ - target_version++; - //Returns all the elements in the sorted set at key with a score that version < score <= version_in_redis. - //The elements are considered to be ordered from low to high scores(version). - reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",rm_status_sset,version,target_version); - - if(reply==NULL) + rule_num=get_inc_key_list(instance_version, target_version, c, &s_rule_array,logger); + if(ret>0) { - __redis_strerror_r(errno,err_buff,sizeof(err_buff)); - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "GET %s failed %s.",rm_status_sset,err_buff); - return -1; - } - assert(reply->type==REDIS_REPLY_ARRAY); - rule_num=reply->elements; - if(reply->elements==0) - { - //a duplicate rule_id would induce this error. - freeReplyObject(reply); + break; } - - }while(rule_num==0&&target_version<=version_in_redis&&cumulative_off==1); - if(rule_num==0) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s" - ,rm_status_sset,version,target_version-1,cumulative_off==1?"OFF":"ON"); - goto FULL_UPDATE; - } - tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str); - if(tmp_reply->type!=REDIS_REPLY_STRING) - { - MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "ZSCORE %s %s failed Version: %lld->%lld",rm_status_sset,reply->element[0]->str,version, target_version); - free(tmp_reply); - free(reply); - goto FULL_UPDATE; - - } - nearest_rule_version=read_redis_integer(tmp_reply); - freeReplyObject(tmp_reply); - tmp_reply=NULL; - if(nearest_rule_version!=version+1) - { - MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "Noncontinuous VERSION Redis: %lld MAAT: %lld.",nearest_rule_version,version); - - goto FULL_UPDATE; - } - MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "Inc Update form version %lld to %lld (%lld entries).",version,target_version,reply->elements); - - s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t)); - for(i=0;ielements;i++) - { - assert(reply->element[i]->type==REDIS_REPLY_STRING); - ret=sscanf(reply->element[i]->str,"%[^,],%[^,],%d",op_str,s_rule[i].table_name,&(s_rule[i].rule_id)); - assert(ret==3); - if(strncmp(op_str,"ADD",strlen("ADD"))==0) + else if(ret<0) { - s_rule[i].op=MAAT_OP_ADD; - } - else if(strncmp(op_str,"DEL",strlen("DEL"))==0) - { - s_rule[i].op=MAAT_OP_DEL; + goto FULL_UPDATE; } else { - assert(0); + //ret=0, nothing to do. } + + }while(rule_num==0&&target_version<=redis_version&&cumulative_off==1); + if(rule_num==0) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s" + ,rm_status_sset,instance_version,target_version-1,cumulative_off==1?"OFF":"ON"); + goto FULL_UPDATE; } - *list=s_rule; + MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, + "Inc Update form instance_version %lld to %lld (%lld entries).",instance_version,target_version,rule_num); + *list=s_rule_array; *update_type=CM_UPDATE_TYPE_INC; - freeReplyObject(reply); *new_version=target_version; - return i; + return rule_num; + FULL_UPDATE: MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "Initiate full udpate from version %d to %lld.",version,version_in_redis); + "Initiate full udpate from instance_version %d to %lld.",instance_version,desired_version==0?redis_version:desired_version); append_cmd_cnt=0; ret=redisAppendCommand(c, "MULTI"); append_cmd_cnt++; @@ -486,19 +586,48 @@ FULL_UPDATE: *new_version=read_redis_integer(reply->element[0]); sub_reply=reply->element[1]; assert(sub_reply->type==REDIS_REPLY_ARRAY); - s_rule=(struct serial_rule_t*)calloc(sub_reply->elements,sizeof(struct serial_rule_t)); + rule_num=sub_reply->elements; + s_rule_array=(struct serial_rule_t*)calloc(rule_num,sizeof(struct serial_rule_t)); for(full_idx=0;full_idxelements;full_idx++) { assert(sub_reply->element[full_idx]->type==REDIS_REPLY_STRING); - ret=sscanf(sub_reply->element[full_idx]->str,"%*[^:]:%[^,],%d",s_rule[full_idx].table_name,&(s_rule[full_idx].rule_id)); - s_rule[full_idx].op=MAAT_OP_ADD; + ret=sscanf(sub_reply->element[full_idx]->str,"%*[^:]:%[^,],%d",s_rule_array[full_idx].table_name,&(s_rule_array[full_idx].rule_id)); + s_rule_array[full_idx].op=MAAT_OP_ADD; assert(ret==2); } freeReplyObject(reply); - *list=s_rule; + if(desired_version!=0) + { + changed_rule_num=get_inc_key_list(desired_version, redis_version, c, &changed_rule_array, logger); + if(changed_rule_num<0) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Recover history version %lld faild where as redis version is %lld.", desired_version, redis_version); + } + else if(changed_rule_num==0) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Nothing to recover from history version %lld to redis version is %lld.", desired_version, redis_version); + } + else + { + ret=recovery_history_version(s_rule_array, full_idx, changed_rule_array, changed_rule_num, &history_rule_array); + if(ret>0) + { + free(s_rule_array); + s_rule_array=history_rule_array; + rule_num=ret; + *new_version=desired_version; + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Successfully recovered from history version %lld to redis version is %lld.", desired_version, redis_version); + } + } + free(changed_rule_array); + } + *list=s_rule_array; *update_type=CM_UPDATE_TYPE_FULL; - return full_idx ; + return rule_num ; } int _get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger) @@ -1155,7 +1284,9 @@ void redis_monitor_traverse(long long version,redisContext *c, { return; } - rule_num=get_rm_key_list(version, c, &rule_list, logger,&new_version, &update_type,feather->cumulative_update_off); + + rule_num=get_rm_key_list(c, version, feather->load_from_specific_version, &new_version, &rule_list, &update_type, logger, feather->cumulative_update_off); + feather->load_from_specific_version=0;//only valid for one time. if(rule_num<0||(rule_num==0&&update_type==CM_UPDATE_TYPE_INC))//error or nothing changed { return; diff --git a/src/entry/Maat_rule_internal.h b/src/entry/Maat_rule_internal.h index 8271abb..5b2cc3f 100644 --- a/src/entry/Maat_rule_internal.h +++ b/src/entry/Maat_rule_internal.h @@ -393,6 +393,7 @@ struct _Maat_feather_t struct _Maat_cmd_inner_t* cmd_qhead, *cmd_qtail; pthread_mutex_t redis_write_lock; //protect redis_write_ctx long long base_rgn_seq,base_grp_seq,server_time; + long long load_from_specific_version; //internal states long long new_version; int active_plugin_table_num; @@ -472,7 +473,7 @@ void maat_stat_output(struct _Maat_feather_t* feather); char* _maat_strdup(const char* s); char* str_unescape(char* s); redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...); -int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** list,void* logger, long long* new_version,int *update_type, int cumulative_off); +int get_rm_key_list(redisContext *c, long long instance_version, long long desired_version, long long* new_version, struct serial_rule_t** list,int *update_type, void* logger, int cumulative_off); int get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger,int print_process); void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op,int rule_id,int label_id,const char* table_name,const char* line, long long timeout); void empty_serial_rules(struct serial_rule_t* rule); diff --git a/src/entry/map_str2int.h b/src/entry/map_str2int.h index 54e93cd..dce9d97 100644 --- a/src/entry/map_str2int.h +++ b/src/entry/map_str2int.h @@ -1,5 +1,6 @@ #ifndef __MAP_STR2INT_H_INCLUDE_ #define __MAP_STR2INT_H_INCLUDE_ +#include MESA_htable_handle map_create(void); void map_destroy(MESA_htable_handle p); int map_register(MESA_htable_handle handle,const char* string,int value); diff --git a/tools/maat_redis_tool.cpp b/tools/maat_redis_tool.cpp index 9fae08f..8b0a53c 100644 --- a/tools/maat_redis_tool.cpp +++ b/tools/maat_redis_tool.cpp @@ -21,6 +21,7 @@ void maat_tool_print_usage(void) printf("\t-p [port], redis port, 6379 as default.\n"); printf("\t-n [db], redis db, 0 as default.\n"); printf("\t-d [dir], dump rules from redis to [dir], %s as default.\n",redis_dump_dir); + printf("\t-d [version], dump specific [version] from redis, dump latest version as default.\n"); printf("\t-j [payload.json], add or delete rules as maat json. Must have field compile_table field, and plugin table's valid flag must be in the last column.\n"); printf("\t-t [timeout], timeout config after t seconds, default is 0 which means never timeout.\n"); printf("example: ./maat_redis_tool -h 127.0.0.1 -p 6379 -d %s\n",redis_dump_dir); @@ -57,7 +58,7 @@ static redisContext * connect_redis(const char*redis_ip, int redis_port, int red return ctx; } -void read_rule_from_redis(redisContext * ctx,const char* output_path ,void*logger) +void read_rule_from_redis(redisContext * ctx, long long desire_version, const char* output_path ,void*logger) { struct serial_rule_t* rule_list; int rule_num=0,line_count=0; @@ -68,11 +69,18 @@ void read_rule_from_redis(redisContext * ctx,const char* output_path ,void*logge char table_path[256],index_path[256]; FILE *table_fp=NULL, *index_fp=NULL; - - rule_num=get_rm_key_list(0, ctx, &rule_list, logger,&version, &update_type,0); + rule_num=get_rm_key_list(ctx, 0, desire_version, &version, &rule_list, &update_type, logger,0); + if(desire_version!=0) if(rule_num==0) { - printf("No Effective Rules.\n"); + if(desire_version!=0) + { + printf("Read desired version %lld failed.\n",desire_version); + } + else + { + printf("No Effective Rules.\n"); + } return; } if(rule_num<0) @@ -226,9 +234,10 @@ int main(int argc, char * argv[]) cJSON *json=NULL, *tmp_obj=NULL; struct stat fstat_buf; unsigned long json_file_size=0,read_size=0; + long long desired_version=0; char* json_buff=NULL; - while((oc=getopt(argc,argv,"h:p:n:d:f:j:t:"))!=-1) + while((oc=getopt(argc,argv,"h:p:n:d:v:f:j:t:"))!=-1) { switch(oc) { @@ -249,6 +258,9 @@ int main(int argc, char * argv[]) dump_dir[strlen(dump_dir)-1]='\0'; } break; + case 'v': + sscanf(optarg,"%lld",&desired_version); + break; case 'j': strncpy(json_file, optarg,sizeof(json_file)); model=WORK_MODE_JSON; @@ -308,7 +320,7 @@ int main(int argc, char * argv[]) if(model==WORK_MODE_DUMP) { printf("Reading key list from %s:%d db%d.\n",redis_ip,redis_port,redis_db); - read_rule_from_redis(ctx,dump_dir, NULL); + read_rule_from_redis(ctx,desired_version,dump_dir, NULL); } else if(model==WORK_MODE_JSON) {