MAAT版本号从32位升级到64位,同时支持多个内部状态暴露,支持设置逐版本号加载。

This commit is contained in:
zhengchao
2017-12-06 18:12:32 +08:00
parent 6e7e8214cc
commit 9aa6917b31
9 changed files with 129 additions and 54 deletions

View File

@@ -355,7 +355,7 @@ int _wrap_redisReconnect(redisContext* c, void*logger)
return -1;
}
}
int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t** list,void* logger, unsigned int* new_version,int *update_type)
int get_rm_key_list(long long version,redisContext *c,struct serial_rule_t** list,void* logger, long long* new_version,int *update_type, int cumulative_off)
{
redisReply* reply=NULL,*sub_reply=NULL,*tmp_reply=NULL;
char err_buff[256];
@@ -392,7 +392,6 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
{
return 0;
}
*new_version=version_in_redis;
if(version==0)
{
@@ -401,14 +400,18 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
if(version_in_redis<version)
{
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"VERSION roll back MAAT: %d -> Redis: %lld.",version,version_in_redis);
"VERSION roll back MAAT: %lld -> Redis: %lld.",version,version_in_redis);
goto FULL_UPDATE;
}
if(version_in_redis>version&&cumulative_off==1)
{
version_in_redis=version+1;
}
//Returns all the elements in the sorted set at key with a score that version < score <= version_in_redis.
//The elements are considered to be ordered from low to high scores(version).
reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%d %d",rm_status_sset,version,version_in_redis);
reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",rm_status_sset,version,version_in_redis);
if(reply==NULL)
{
__redis_strerror_r(errno,err_buff,sizeof(err_buff));
@@ -420,7 +423,7 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
if(reply->elements==0)
{
//a duplicate rule_id would induce this error.
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%d %d",rm_status_sset,version,version_in_redis);
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld",rm_status_sset,version,version_in_redis);
freeReplyObject(reply);
return -1;
}
@@ -428,7 +431,7 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
if(tmp_reply->type!=REDIS_REPLY_STRING)
{
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"ZSCORE %s %s failed Version: %d->%d",rm_status_sset,reply->element[0]->str,version, version_in_redis);
"ZSCORE %s %s failed Version: %lld->%lld",rm_status_sset,reply->element[0]->str,version, version_in_redis);
free(tmp_reply);
free(reply);
goto FULL_UPDATE;
@@ -440,12 +443,12 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
if(nearest_rule_version!=version+1)
{
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Noncontinuous VERSION Redis: %lld MAAT: %d.",nearest_rule_version,version);
"Noncontinuous VERSION Redis: %lld MAAT: %lld.",nearest_rule_version,version);
goto FULL_UPDATE;
}
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Inc Update form version %d to %lld (%lld entries).",version,version_in_redis,reply->elements);
"Inc Update form version %lld to %lld (%lld entries).",version,version_in_redis,reply->elements);
s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t));
for(i=0;i<reply->elements;i++)
@@ -469,6 +472,7 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
*list=s_rule;
*update_type=CM_UPDATE_TYPE_INC;
freeReplyObject(reply);
*new_version=version_in_redis;
return i;
FULL_UPDATE:
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
@@ -1048,8 +1052,8 @@ void cleanup_update_status(redisContext *ctx, void *logger)
,entry_num);
}
void redis_monitor_traverse(unsigned int version,redisContext *c,
void (*start)(unsigned int ,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
void redis_monitor_traverse(long long version,redisContext *c,
void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para
void (*finish)(void*),//u_para
void* u_para,
@@ -1060,7 +1064,7 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
int ret=0;
struct serial_rule_t* rule_list=NULL;
int update_type=CM_UPDATE_TYPE_INC;
unsigned int new_version=0;
long long new_version=0;
enum MAAT_TABLE_TYPE table_type;
void* logger=feather->logger;
if(feather->redis_write_ctx!=NULL)//authorized to write
@@ -1069,7 +1073,7 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
check_maat_expiration(feather->redis_read_ctx, logger);
cleanup_update_status(feather->redis_read_ctx, logger);
}
rule_num=get_rm_key_list(version, c, &rule_list, logger,&new_version, &update_type);
rule_num=get_rm_key_list(version, c, &rule_list, logger,&new_version, &update_type,feather->cumulative_update_off);
if(rule_num<0||(rule_num==0&&update_type==CM_UPDATE_TYPE_INC))//error or nothing changed
{
return;