Redis模式支持从特定配置版本开始初始化,可通过MAAT_OPT_LOAD_SPECIFIC_VERSION参数设置。

This commit is contained in:
zhengchao
2018-06-03 20:31:22 +08:00
parent a751eab6bb
commit 613be3c03f
6 changed files with 245 additions and 95 deletions

View File

@@ -325,16 +325,162 @@ int _wrap_redisReconnect(redisContext* c, void*logger)
return -1;
}
}
int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** list,void* logger, long long* new_version,int *update_type, int cumulative_off)
int get_inc_key_list(long long instance_version, long long target_version, redisContext *c, struct serial_rule_t** list,void* logger)
{
redisReply* reply=NULL,*sub_reply=NULL,*tmp_reply=NULL;
redisReply* reply=NULL,*tmp_reply=NULL;
char err_buff[256], op_str[4];
int rule_num=0,ret=0;
unsigned int i=0;
long long nearest_rule_version;
struct serial_rule_t *s_rule=NULL;
//Returns all the elements in the sorted set at key with a score that instance_version < score <= redis_version.
//The elements are considered to be ordered from low to high scores(instance_version).
reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",rm_status_sset,instance_version,target_version);
if(reply==NULL)
{
__redis_strerror_r(errno,err_buff,sizeof(err_buff));
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
"GET %s failed %s.",rm_status_sset,err_buff);
return -1;
}
assert(reply->type==REDIS_REPLY_ARRAY);
rule_num=reply->elements;
if(reply->elements==0)
{
return 0;
}
tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str);
if(tmp_reply->type!=REDIS_REPLY_STRING)
{
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"ZSCORE %s %s failed Version: %lld->%lld",rm_status_sset,reply->element[0]->str,instance_version, target_version);
freeReplyObject(tmp_reply);
freeReplyObject(reply);
return -1;
}
nearest_rule_version=read_redis_integer(tmp_reply);
freeReplyObject(tmp_reply);
tmp_reply=NULL;
if(nearest_rule_version!=instance_version+1)
{
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Noncontinuous VERSION Redis: %lld MAAT: %lld.",nearest_rule_version,instance_version);
return -1;
}
s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t));
for(i=0;i<reply->elements;i++)
{
assert(reply->element[i]->type==REDIS_REPLY_STRING);
ret=sscanf(reply->element[i]->str,"%[^,],%[^,],%d",op_str,s_rule[i].table_name,&(s_rule[i].rule_id));
assert(ret==3);
if(strncmp(op_str,"ADD",strlen("ADD"))==0)
{
s_rule[i].op=MAAT_OP_ADD;
}
else if(strncmp(op_str,"DEL",strlen("DEL"))==0)
{
s_rule[i].op=MAAT_OP_DEL;
}
else
{
assert(0);
}
}
rule_num=reply->elements;
*list=s_rule;
freeReplyObject(reply);
return rule_num;
}
struct s_rule_array_t
{
int cnt;
int size;
struct serial_rule_t* array;
};
void save_serial_rule_cb(const uchar * key, uint size, void * data, void * user)
{
struct s_rule_array_t* array=(struct s_rule_array_t*)user;
int i=array->cnt;
memcpy(&(array->array[i]),data,sizeof(struct serial_rule_t));
array->array[i].op=MAAT_OP_ADD;
return;
}
int recovery_history_version(const struct serial_rule_t* current, int current_num, const struct serial_rule_t* changed, int changed_num, struct serial_rule_t** history_result)
{
int i=0,ret=0;
unsigned int history_num=0;
int hash_slot_size=1;
MESA_htable_handle htable=NULL;
MESA_htable_create_args_t hargs;
struct s_rule_array_t tmp_array;
char hkey[256+20];
int tmp=current_num+changed_num;
for(;tmp>0;tmp=tmp/2)
{
hash_slot_size*=2;
}
memset(&hargs,0,sizeof(hargs));
hargs.thread_safe=0;
hargs.hash_slot_size = hash_slot_size;
hargs.max_elem_num = 0;
hargs.eliminate_type = HASH_ELIMINATE_ALGO_FIFO;
hargs.expire_time = 0;
hargs.key_comp = NULL;
hargs.key2index = NULL;
hargs.recursive = 1;
hargs.data_free = NULL;//data is an reference, no need to free.
hargs.data_expire_with_condition = NULL;
htable=MESA_htable_create(&hargs, sizeof(hargs));
MESA_htable_print_crtl(htable, 0);
for(i=0;i<current_num;i++)
{
snprintf(hkey,sizeof(hkey),"%d,%s",current[i].rule_id,current[i].table_name);
ret=MESA_htable_add(htable, (uchar*)hkey, strlen(hkey), current+i);
assert(ret>0);
}
for(i=changed_num-1;i>=0;i--)
{
snprintf(hkey,sizeof(hkey),"%d,%s",changed[i].rule_id,changed[i].table_name);
if(changed[i].op==MAAT_OP_ADD)//newly added rule is need to delete from current, so that history version can be recovered.
{
ret=MESA_htable_del(htable, (uchar*)hkey, strlen(hkey), NULL);
}
else
{
ret=MESA_htable_add(htable, (uchar*)hkey, strlen(hkey),changed+i);
}
if(ret<0)//failed
{
goto error_out;
}
}
history_num=MESA_htable_get_elem_num(htable);
tmp_array.cnt=0;
tmp_array.size=history_num;
tmp_array.array=(struct serial_rule_t*)calloc(history_num,sizeof(struct serial_rule_t));
MESA_htable_iterate(htable, save_serial_rule_cb, &tmp_array);
*history_result=tmp_array.array;
ret=history_num;
error_out:
MESA_htable_destroy(htable, NULL);
return ret;
}
int get_rm_key_list(redisContext *c, long long instance_version, long long desired_version, long long* new_version, struct serial_rule_t** list,int *update_type, void* logger, int cumulative_off)
{
redisReply* reply=NULL,*sub_reply=NULL;
char err_buff[256];
char op_str[4];
long long version_in_redis=0,target_version=0,nearest_rule_version=0;
int rule_num=0;
long long redis_version=0,target_version=0;
int rule_num=0, changed_rule_num=0;
int ret=0;
unsigned int i=0,full_idx =0,append_cmd_cnt=0;
struct serial_rule_t *s_rule=NULL;
struct serial_rule_t *s_rule_array=NULL, *changed_rule_array=NULL, *history_rule_array=NULL;
reply=(redisReply*)redisCommand(c, "GET MAAT_VERSION");
if(reply!=NULL)
@@ -357,110 +503,64 @@ int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** lis
_wrap_redisReconnect(c,logger);
return -1;
}
version_in_redis=read_redis_integer(reply);
redis_version=read_redis_integer(reply);
freeReplyObject(reply);
if(version_in_redis==version)
if(redis_version==instance_version)
{
return 0;
}
if(version==0)
if(instance_version==0||desired_version!=0)
{
goto FULL_UPDATE;
}
if(version_in_redis<version)
if(redis_version<instance_version)
{
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"VERSION roll back MAAT: %lld -> Redis: %lld.",version,version_in_redis);
"VERSION roll back MAAT: %lld -> Redis: %lld.",instance_version,redis_version);
goto FULL_UPDATE;
}
if(version_in_redis>version&&cumulative_off==1)
if(redis_version>instance_version&&cumulative_off==1)
{
target_version=version;
target_version=instance_version;
}
else
{
target_version=version_in_redis-1;
target_version=redis_version-1;
}
do{
target_version++;
//Returns all the elements in the sorted set at key with a score that version < score <= version_in_redis.
//The elements are considered to be ordered from low to high scores(version).
reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",rm_status_sset,version,target_version);
if(reply==NULL)
rule_num=get_inc_key_list(instance_version, target_version, c, &s_rule_array,logger);
if(ret>0)
{
__redis_strerror_r(errno,err_buff,sizeof(err_buff));
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
"GET %s failed %s.",rm_status_sset,err_buff);
return -1;
}
assert(reply->type==REDIS_REPLY_ARRAY);
rule_num=reply->elements;
if(reply->elements==0)
{
//a duplicate rule_id would induce this error.
freeReplyObject(reply);
break;
}
}while(rule_num==0&&target_version<=version_in_redis&&cumulative_off==1);
if(rule_num==0)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s"
,rm_status_sset,version,target_version-1,cumulative_off==1?"OFF":"ON");
goto FULL_UPDATE;
}
tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",rm_status_sset,reply->element[0]->str);
if(tmp_reply->type!=REDIS_REPLY_STRING)
{
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"ZSCORE %s %s failed Version: %lld->%lld",rm_status_sset,reply->element[0]->str,version, target_version);
free(tmp_reply);
free(reply);
goto FULL_UPDATE;
}
nearest_rule_version=read_redis_integer(tmp_reply);
freeReplyObject(tmp_reply);
tmp_reply=NULL;
if(nearest_rule_version!=version+1)
{
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Noncontinuous VERSION Redis: %lld MAAT: %lld.",nearest_rule_version,version);
goto FULL_UPDATE;
}
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Inc Update form version %lld to %lld (%lld entries).",version,target_version,reply->elements);
s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t));
for(i=0;i<reply->elements;i++)
{
assert(reply->element[i]->type==REDIS_REPLY_STRING);
ret=sscanf(reply->element[i]->str,"%[^,],%[^,],%d",op_str,s_rule[i].table_name,&(s_rule[i].rule_id));
assert(ret==3);
if(strncmp(op_str,"ADD",strlen("ADD"))==0)
else if(ret<0)
{
s_rule[i].op=MAAT_OP_ADD;
}
else if(strncmp(op_str,"DEL",strlen("DEL"))==0)
{
s_rule[i].op=MAAT_OP_DEL;
goto FULL_UPDATE;
}
else
{
assert(0);
//ret=0, nothing to do.
}
}while(rule_num==0&&target_version<=redis_version&&cumulative_off==1);
if(rule_num==0)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s"
,rm_status_sset,instance_version,target_version-1,cumulative_off==1?"OFF":"ON");
goto FULL_UPDATE;
}
*list=s_rule;
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Inc Update form instance_version %lld to %lld (%lld entries).",instance_version,target_version,rule_num);
*list=s_rule_array;
*update_type=CM_UPDATE_TYPE_INC;
freeReplyObject(reply);
*new_version=target_version;
return i;
return rule_num;
FULL_UPDATE:
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Initiate full udpate from version %d to %lld.",version,version_in_redis);
"Initiate full udpate from instance_version %d to %lld.",instance_version,desired_version==0?redis_version:desired_version);
append_cmd_cnt=0;
ret=redisAppendCommand(c, "MULTI");
append_cmd_cnt++;
@@ -486,19 +586,48 @@ FULL_UPDATE:
*new_version=read_redis_integer(reply->element[0]);
sub_reply=reply->element[1];
assert(sub_reply->type==REDIS_REPLY_ARRAY);
s_rule=(struct serial_rule_t*)calloc(sub_reply->elements,sizeof(struct serial_rule_t));
rule_num=sub_reply->elements;
s_rule_array=(struct serial_rule_t*)calloc(rule_num,sizeof(struct serial_rule_t));
for(full_idx=0;full_idx<sub_reply->elements;full_idx++)
{
assert(sub_reply->element[full_idx]->type==REDIS_REPLY_STRING);
ret=sscanf(sub_reply->element[full_idx]->str,"%*[^:]:%[^,],%d",s_rule[full_idx].table_name,&(s_rule[full_idx].rule_id));
s_rule[full_idx].op=MAAT_OP_ADD;
ret=sscanf(sub_reply->element[full_idx]->str,"%*[^:]:%[^,],%d",s_rule_array[full_idx].table_name,&(s_rule_array[full_idx].rule_id));
s_rule_array[full_idx].op=MAAT_OP_ADD;
assert(ret==2);
}
freeReplyObject(reply);
*list=s_rule;
if(desired_version!=0)
{
changed_rule_num=get_inc_key_list(desired_version, redis_version, c, &changed_rule_array, logger);
if(changed_rule_num<0)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
"Recover history version %lld faild where as redis version is %lld.", desired_version, redis_version);
}
else if(changed_rule_num==0)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
"Nothing to recover from history version %lld to redis version is %lld.", desired_version, redis_version);
}
else
{
ret=recovery_history_version(s_rule_array, full_idx, changed_rule_array, changed_rule_num, &history_rule_array);
if(ret>0)
{
free(s_rule_array);
s_rule_array=history_rule_array;
rule_num=ret;
*new_version=desired_version;
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
"Successfully recovered from history version %lld to redis version is %lld.", desired_version, redis_version);
}
}
free(changed_rule_array);
}
*list=s_rule_array;
*update_type=CM_UPDATE_TYPE_FULL;
return full_idx ;
return rule_num ;
}
int _get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger)
@@ -1155,7 +1284,9 @@ void redis_monitor_traverse(long long version,redisContext *c,
{
return;
}
rule_num=get_rm_key_list(version, c, &rule_list, logger,&new_version, &update_type,feather->cumulative_update_off);
rule_num=get_rm_key_list(c, version, feather->load_from_specific_version, &new_version, &rule_list, &update_type, logger, feather->cumulative_update_off);
feather->load_from_specific_version=0;//only valid for one time.
if(rule_num<0||(rule_num==0&&update_type==CM_UPDATE_TYPE_INC))//error or nothing changed
{
return;