增加增量更新状态删除功能。实现select index 功能。

This commit is contained in:
zhengchao
2017-08-10 18:31:20 +08:00
parent 37972b3552
commit f72cf74b8c
4 changed files with 78 additions and 18 deletions

View File

@@ -599,7 +599,7 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo
_feather->redis_index=*((int*)value);
break;
case MAAT_OPT_CMD_AUTO_NUMBERING:
if((size_t)size!=sizeof(int))
if((size_t)size!=sizeof(int)||*((int*)value)>15||*((int*)value)<0)
{
return -1;
}
@@ -613,6 +613,7 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo
int Maat_initiate_feather(Maat_feather_t feather)
{
_Maat_feather_t* _feather=(_Maat_feather_t*)feather;
redisReply* reply=NULL;
if(strlen(_feather->redis_ip)>0&&_feather->redis_port!=0)
{
@@ -630,6 +631,8 @@ int Maat_initiate_feather(Maat_feather_t feather)
return -1;
}
redisEnableKeepAlive(_feather->redis_read_ctx);
reply=_wrap_redisCommand(_feather->redis_read_ctx, "select %d",_feather->redis_index);
freeReplyObject(reply);
redis_monitor_traverse(_feather->maat_version
,_feather->redis_read_ctx
,maat_start_cb

View File

@@ -14,9 +14,10 @@ const char* maat_redis_command="MAAT_REDIS_COMMAND";
const char* rm_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"};
const char* rm_status_sset="MAAT_UPDATE_STATUS";
const char* rm_expire_sset="MAAT_EXPIRE_TIMER";
const char* rm_expire_sset="MAAT_RULE_TIMER";
const char* rm_label_sset="MAAT_LABEL_INDEX";
const int MAAT_REDIS_SYNC_TIME=30*60;
const char* rm_version_sset="MAAT_VERSION_TIMER";
const static int MAAT_REDIS_SYNC_TIME=30*60;
struct serial_rule_t //rm= Redis Maat
{
@@ -52,6 +53,7 @@ redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...)
int connect_redis_for_write(_Maat_feather_t * feather)
{
int ret=0;
redisReply* reply=NULL;
assert(feather->redis_write_ctx==NULL);
feather->redis_write_ctx=redisConnectWithTimeout(feather->redis_ip, feather->redis_port,feather->connect_timeout);
if(feather->redis_write_ctx==NULL)
@@ -61,6 +63,11 @@ int connect_redis_for_write(_Maat_feather_t * feather)
,feather->redis_ip,feather->redis_port);
ret=-1;
}
else
{
reply=_wrap_redisCommand(feather->redis_read_ctx, "select %d",feather->redis_index);
freeReplyObject(reply);
}
return ret;
}
long long read_redis_integer(const redisReply* reply)
@@ -386,11 +393,9 @@ int get_rm_key_list(unsigned int version,redisContext *c,struct serial_rule_t**
goto FULL_UPDATE;
}
else
{
MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor,
"Inc Update form version %d to %lld.",version,version_in_redis);
}
s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t));
for(i=0;i<reply->elements;i++)
{
@@ -675,7 +680,7 @@ int mr_transaction_success(redisReply* data_reply)
return 1;
}
}
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num)
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time)
{
int append_cmd_cnt=0,i=0;
long long maat_redis_version=0;
@@ -729,6 +734,8 @@ int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_r
}
redisAppendCommand(ctx,"INCRBY MAAT_VERSION 1");
append_cmd_cnt++;
redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_version_sset,server_time,maat_redis_version);
append_cmd_cnt++;
redisAppendCommand(ctx,"EXEC");
append_cmd_cnt++;
redis_transaction_success=1;
@@ -803,9 +810,9 @@ void check_maat_expiration(redisContext *ctx, void *logger)
redisReply* data_reply=NULL;
struct serial_rule_t* s_rule=NULL;
long long server_time=0;
data_reply=_wrap_redisCommand(ctx, "TIME");
server_time=data_reply->element[0]->integer;
freeReplyObject(data_reply);
server_time=redis_server_time(ctx);
data_reply=_wrap_redisCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",rm_expire_sset,server_time);
if(data_reply->type!=REDIS_REPLY_ARRAY||data_reply->elements==0)
{
@@ -821,21 +828,69 @@ void check_maat_expiration(redisContext *ctx, void *logger)
assert(ret==2);
}
freeReplyObject(data_reply);
is_success=exec_serial_rule(ctx,s_rule, s_rule_num);
is_success=exec_serial_rule(ctx,s_rule, s_rule_num,server_time);
if(is_success==1)
{
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_module
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
,"Succesfully expried %d rules in Redis.", s_rule_num);
}
else
{
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_module
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
,"Failed to expried %d rules in Redis.", s_rule_num);
}
free(s_rule);
return;
}
void cleanup_update_status(redisContext *ctx, void *logger)
{
redisReply* reply=NULL,*sub_reply=NULL;
int append_cmd_cnt=0,i=0;
long long server_time=0, version_upper_bound=0,version_lower_bound=0,version_num=0,entry_num=0;
server_time=redis_server_time(ctx);
reply=_wrap_redisCommand(ctx,"MULTI");
freeReplyObject(reply);
redisAppendCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",rm_version_sset,server_time-MAAT_REDIS_SYNC_TIME);
append_cmd_cnt++;
redisAppendCommand(ctx, "ZREMRANGEBYSCORE %s -inf %lld",rm_version_sset,server_time-MAAT_REDIS_SYNC_TIME);
append_cmd_cnt++;
//consume reply "OK" and "QUEUED".
for(i=0;i<append_cmd_cnt;i++)
{
_wrap_redisGetReply(ctx, &reply);
freeReplyObject(reply);
reply=NULL;
}
reply=_wrap_redisCommand(ctx,"EXEC");
assert(reply->type==REDIS_REPLY_ARRAY);
sub_reply=reply->element[0];
assert(sub_reply->type==REDIS_REPLY_ARRAY);
version_num=sub_reply->elements;
if(version_num==0)
{
freeReplyObject(reply);
return;
}
version_lower_bound=read_redis_integer(sub_reply->element[0]);
version_upper_bound=read_redis_integer(sub_reply->element[sub_reply->elements-1]);
freeReplyObject(reply);
reply=_wrap_redisCommand(ctx,"ZREMRANGEBYSCORE %s -inf %lld",rm_expire_sset,version_upper_bound);
entry_num=read_redis_integer(reply);
freeReplyObject(reply);
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor
,"Clean up updaste status from version %lld to %lld (%lld versions, %lld entries)."
,version_lower_bound
,version_upper_bound
,version_num
,entry_num);
}
void redis_monitor_traverse(unsigned int version,redisContext *c,
void (*start)(unsigned int ,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para
@@ -856,6 +911,7 @@ void redis_monitor_traverse(unsigned int version,redisContext *c,
{
//For thread safe, deliberately use redis_read_ctx but not redis_write_ctx.
check_maat_expiration(feather->redis_read_ctx, logger);
cleanup_update_status(feather->redis_read_ctx, logger);
}
rule_num=get_rm_key_list(version, c, &rule_list, logger,&new_version, &update_type);
if(rule_num==0)
@@ -1020,7 +1076,7 @@ int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_line_t* line_rule
ret=0;
while(!ret)
{
ret=exec_serial_rule(_feather->redis_write_ctx,&s_rule, 1);
ret=exec_serial_rule(_feather->redis_write_ctx,&s_rule, 1,_feather->server_time);
retry++;
}
if(retry>10)
@@ -1189,7 +1245,7 @@ int Maat_cmd_commit(Maat_feather_t feather)
transection_success=0;
while(!transection_success)
{
transection_success=exec_serial_rule(ctx, s_rule,serial_rule_num);
transection_success=exec_serial_rule(ctx, s_rule,serial_rule_num,_feather->server_time);
if(transection_success!=1)
{
retry++;

View File

@@ -28,7 +28,7 @@
#include "stream_fuzzy_hash.h"
#include "gram_index_engine.h"
int MAAT_FRAME_VERSION_2_0_20170809=1;
int MAAT_FRAME_VERSION_2_0_20170810=1;
const char *maat_module="MAAT Frame";
const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin",

View File

@@ -452,6 +452,7 @@ void maat_stat_table(struct _Maat_table_info_t* p_table,int scan_len,struct time
void maat_stat_output(struct _Maat_feather_t* feather);
char* _maat_strdup(const char* s);
char* str_unescape(char* s);
redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...);
void redis_monitor_traverse(unsigned int version,redisContext *c,
void (*start)(unsigned int ,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para