重构连接redis的代码。

This commit is contained in:
zhengchao
2018-11-27 19:53:42 +08:00
parent bcfb1e2ac8
commit 56ecf3eed4
5 changed files with 151 additions and 169 deletions

View File

@@ -442,7 +442,7 @@ Maat_feather_t Maat_feather(int max_thread_num,const char* table_info_path,void*
,max_thread_num); ,max_thread_num);
return NULL; return NULL;
} }
_Maat_feather_t* feather=(_Maat_feather_t*)calloc(sizeof(struct _Maat_feather_t),1); _Maat_feather_t* feather=ALLOC(struct _Maat_feather_t, 1);
feather->table_cnt=read_table_info(feather->p_table_info, MAX_TABLE_NUM,table_info_path,max_thread_num,logger); feather->table_cnt=read_table_info(feather->p_table_info, MAX_TABLE_NUM,table_info_path,max_thread_num,logger);
feather->map_tablename2id=map_create(); feather->map_tablename2id=map_create();
int i=0,j=0,ret=0; int i=0,j=0,ret=0;
@@ -489,15 +489,11 @@ Maat_feather_t Maat_feather(int max_thread_num,const char* table_info_path,void*
feather->last_full_version=0; feather->last_full_version=0;
feather->base_grp_seq=0; feather->base_grp_seq=0;
feather->base_rgn_seq=0; feather->base_rgn_seq=0;
feather->redis_index=0;
feather->AUTO_NUMBERING_ON=1; feather->AUTO_NUMBERING_ON=1;
feather->connect_timeout.tv_sec=0;
feather->connect_timeout.tv_usec=100*1000; // 100 ms
feather->backgroud_update_enabled=1; feather->backgroud_update_enabled=1;
feather->foreign_cont_linger=0; feather->foreign_cont_linger=0;
snprintf(feather->foreign_cont_dir, sizeof(feather->foreign_cont_dir), "%s_files", table_info_path); snprintf(feather->foreign_cont_dir, sizeof(feather->foreign_cont_dir), "%s_files", table_info_path);
pthread_mutex_init(&(feather->backgroud_update_mutex),NULL); pthread_mutex_init(&(feather->backgroud_update_mutex),NULL);
pthread_mutex_init(&(feather->redis_write_lock),NULL);
snprintf(feather->table_info_fn,sizeof(feather->table_info_fn),"%s",table_info_path); snprintf(feather->table_info_fn,sizeof(feather->table_info_fn),"%s",table_info_path);
return feather; return feather;
} }
@@ -627,20 +623,20 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo
memcpy(_feather->decrypt_key,value,size); memcpy(_feather->decrypt_key,value,size);
break; break;
case MAAT_OPT_REDIS_IP: case MAAT_OPT_REDIS_IP:
if((size_t)size>sizeof(_feather->redis_ip)) if((size_t)size>sizeof(_feather->mr_ctx.redis_ip))
{ {
return -1; return -1;
} }
memcpy(_feather->redis_ip,value,size); memcpy(_feather->mr_ctx.redis_ip,value,size);
break; break;
case MAAT_OPT_REDIS_PORT: case MAAT_OPT_REDIS_PORT:
if((size_t)size==sizeof(unsigned short)) if((size_t)size==sizeof(unsigned short))
{ {
_feather->redis_port=*((unsigned short*)value); _feather->mr_ctx.redis_port=*((unsigned short*)value);
} }
else if((size_t)size==sizeof(int)) else if((size_t)size==sizeof(int))
{ {
_feather->redis_port=*((int*)value); _feather->mr_ctx.redis_port=*((int*)value);
} }
else else
{ {
@@ -652,7 +648,7 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo
{ {
return -1; return -1;
} }
_feather->redis_index=*((int*)value); _feather->mr_ctx.redis_db=*((int*)value);
break; break;
case MAAT_OPT_CMD_AUTO_NUMBERING: case MAAT_OPT_CMD_AUTO_NUMBERING:
if((size_t)size!=sizeof(int)||*((int*)value)>15||*((int*)value)<0) if((size_t)size!=sizeof(int)||*((int*)value)>15||*((int*)value)<0)
@@ -700,34 +696,20 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo
} }
void maat_read_full_config(_Maat_feather_t* _feather) void maat_read_full_config(_Maat_feather_t* _feather)
{ {
redisReply* reply=NULL; struct maat_redis_ctx* mr_ctx=&(_feather->mr_ctx);
if(strlen(_feather->redis_ip)>0&&_feather->redis_port!=0) if(strlen(mr_ctx->redis_ip)>0&&mr_ctx->redis_port!=0)
{ {
_feather->REDIS_MODE_ON=1; _feather->REDIS_MODE_ON=1;
MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module , MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module ,
"Maat initiate from Redis %s:%hu db%d." "Maat initiate from Redis %s:%hu db%d."
,_feather->redis_ip ,mr_ctx->redis_ip
,_feather->redis_port ,mr_ctx->redis_port
,_feather->redis_index); ,mr_ctx->redis_db);
_feather->redis_read_ctx=redisConnectWithTimeout(_feather->redis_ip,_feather->redis_port,_feather->connect_timeout); mr_ctx->read_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, _feather->logger);
if(_feather->redis_read_ctx==NULL||_feather->redis_read_ctx->err) if(mr_ctx->read_ctx != NULL)
{ {
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module
,"Redis connect %s:%d failed : %s."
,_feather->redis_ip,_feather->redis_port,_feather->redis_read_ctx==NULL?"Unkonwn":_feather->redis_read_ctx->errstr);
if(_feather->redis_read_ctx!=NULL)
{
redisFree(_feather->redis_write_ctx);
_feather->redis_write_ctx=NULL;
}
}
else
{
redisEnableKeepAlive(_feather->redis_read_ctx);
reply=_wrap_redisCommand(_feather->redis_read_ctx, "select %d",_feather->redis_index);
freeReplyObject(reply);
redis_monitor_traverse(_feather->maat_version redis_monitor_traverse(_feather->maat_version
,_feather->redis_read_ctx ,mr_ctx
,maat_start_cb ,maat_start_cb
,maat_update_cb ,maat_update_cb
,maat_finish_cb ,maat_finish_cb
@@ -760,8 +742,8 @@ void maat_read_full_config(_Maat_feather_t* _feather)
{ {
MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module , MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module ,
"At initiation: no avilable rule in redis in %s:%hu" "At initiation: no avilable rule in redis in %s:%hu"
,_feather->redis_ip ,mr_ctx->redis_ip
,_feather->redis_port); ,mr_ctx->redis_port);
} }
else else

View File

@@ -58,30 +58,65 @@ redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...)
} }
return (redisReply *)reply; return (redisReply *)reply;
} }
int connect_redis_for_write(_Maat_feather_t * feather) redisContext * connect_redis(const char*redis_ip, int redis_port, int redis_db, void* logger)
{ {
int ret=0; struct timeval connect_timeout;
connect_timeout.tv_sec=0;
connect_timeout.tv_usec=100*1000; // 100 ms
redisReply* reply=NULL; redisReply* reply=NULL;
assert(feather->redis_write_ctx==NULL);
feather->redis_write_ctx=redisConnectWithTimeout(feather->redis_ip, feather->redis_port,feather->connect_timeout); redisContext * ctx;
if(feather->redis_write_ctx==NULL||feather->redis_write_ctx->err) ctx=redisConnectWithTimeout(redis_ip, redis_port,connect_timeout);
if(ctx==NULL||ctx->err)
{ {
MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module if(logger==NULL)
,"Redis connect %s:%d for write failed: %s."
,feather->redis_ip,feather->redis_port,feather->redis_write_ctx==NULL?"Unkown":feather->redis_write_ctx->errstr);
if(feather->redis_write_ctx!=NULL)
{ {
redisFree(feather->redis_write_ctx); printf("Unable to connect redis server %s:%d db%d, error: %s\n",
feather->redis_write_ctx=NULL; redis_ip, redis_port, redis_db, ctx==NULL ? "Unknown" : ctx->errstr);
}
ret=-1;
} }
else else
{ {
reply=_wrap_redisCommand(feather->redis_write_ctx, "select %d",feather->redis_index); MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
freeReplyObject(reply); "Unable to connect redis server %s:%d db%d, error: %s\n",
redis_ip, redis_port, redis_db, ctx==NULL ? "Unknown" : ctx->errstr);
} }
return ret; if(ctx!=NULL) redisFree(ctx);
return NULL;
}
redisEnableKeepAlive(ctx);
reply=_wrap_redisCommand(ctx, "select %d",redis_db);
freeReplyObject(reply);
return ctx;
}
int connect_redis_for_write(struct maat_redis_ctx* mr_ctx, void* logger)
{
assert(mr_ctx->write_ctx==NULL);
mr_ctx->write_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, logger);
if(mr_ctx->write_ctx==NULL)
{
return -1;
}
else
{
return 0;
}
}
redisContext* get_redis_ctx_for_write(_Maat_feather_t * feather)
{
int ret=0;
if(feather->mr_ctx.write_ctx==NULL)
{
ret=connect_redis_for_write(&(feather->mr_ctx), feather->logger);
if(ret!=0)
{
return NULL;
}
}
return feather->mr_ctx.write_ctx;
} }
long long read_redis_integer(const redisReply* reply) long long read_redis_integer(const redisReply* reply)
{ {
@@ -319,22 +354,6 @@ void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op,int rule_
} }
return; return;
} }
int _wrap_redisReconnect(redisContext* c, void*logger)
{
int ret=0;
ret=redisReconnect(c);
if(ret==REDIS_OK)
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Reconnect success.");
return 0;
}
else
{
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Reconnect failed.");
return -1;
}
}
int get_inc_key_list(long long instance_version, long long target_version, redisContext *c, struct serial_rule_t** list,void* logger) int get_inc_key_list(long long instance_version, long long target_version, redisContext *c, struct serial_rule_t** list,void* logger)
{ {
redisReply* reply=NULL,*tmp_reply=NULL; redisReply* reply=NULL,*tmp_reply=NULL;
@@ -523,8 +542,7 @@ int get_rm_key_list(redisContext *c, long long instance_version, long long desir
memset(err_buff, 0, sizeof(err_buff)); memset(err_buff, 0, sizeof(err_buff));
__redis_strerror_r(errno, err_buff, sizeof(err_buff)-1); __redis_strerror_r(errno, err_buff, sizeof(err_buff)-1);
MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,
"GET MAAT_VERSION failed %s. Reconnecting...",err_buff); "GET MAAT_VERSION failed %s.",err_buff);
_wrap_redisReconnect(c,logger);
return -1; return -1;
} }
redis_version=read_redis_integer(reply); redis_version=read_redis_integer(reply);
@@ -1709,7 +1727,8 @@ void get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int r
} }
return; return;
} }
void redis_monitor_traverse(long long version,redisContext *c,
void redis_monitor_traverse(long long version, struct maat_redis_ctx* mr_ctx,
void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para
void (*finish)(void*),//u_para void (*finish)(void*),//u_para
@@ -1724,39 +1743,45 @@ void redis_monitor_traverse(long long version,redisContext *c,
long long new_version=0; long long new_version=0;
enum MAAT_TABLE_TYPE table_type; enum MAAT_TABLE_TYPE table_type;
void* logger=feather->logger; void* logger=feather->logger;
if(feather->redis_write_ctx!=NULL&&feather->redis_write_ctx->err==0)//authorized to write
if(mr_ctx->write_ctx!=NULL&&mr_ctx->write_ctx->err==0)//authorized to write
{ {
//For thread safe, deliberately use redis_read_ctx but not redis_write_ctx. //For thread safe, deliberately use redis_read_ctx but not redis_write_ctx.
if(1==redlock_try_lock(feather->redis_read_ctx, rm_expire_lock, rm_expire_lock_timeout)) if(1==redlock_try_lock(mr_ctx->read_ctx, rm_expire_lock, rm_expire_lock_timeout))
{ {
check_maat_expiration(feather->redis_read_ctx, logger); check_maat_expiration(mr_ctx->read_ctx, logger);
cleanup_update_status(feather->redis_read_ctx, logger); cleanup_update_status(mr_ctx->read_ctx, logger);
redlock_unlock(feather->redis_read_ctx,rm_expire_lock); redlock_unlock(mr_ctx->read_ctx, rm_expire_lock);
} }
} }
if(c==NULL) return; if(mr_ctx->read_ctx==NULL||mr_ctx->read_ctx->err)
if(c->err)
{ {
if(time(NULL)-feather->last_reconnect_time < MAAT_REDIS_RECONNECT_INTERVAL) if(time(NULL)-mr_ctx->last_reconnect_time < MAAT_REDIS_RECONNECT_INTERVAL)
{ {
return; return;
} }
feather->last_reconnect_time=time(NULL); mr_ctx->last_reconnect_time=time(NULL);
if(0!=_wrap_redisReconnect(c, logger)) if(mr_ctx->read_ctx!=NULL)
{ {
return; redisFree(mr_ctx->read_ctx);
} }
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Reconnecting...");
mr_ctx->read_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, feather->logger);
} }
rule_num=get_rm_key_list(c, version, feather->load_version_from, &new_version, &rule_list, &update_type, logger, feather->cumulative_update_off); rule_num=get_rm_key_list(mr_ctx->read_ctx, version, feather->load_version_from, &new_version, &rule_list, &update_type, logger, feather->cumulative_update_off);
if(rule_num<0)//redis communication error
{
return;
}
feather->load_version_from=0;//only valid for one time. feather->load_version_from=0;//only valid for one time.
if(rule_num<0||(rule_num==0&&update_type==CM_UPDATE_TYPE_INC))//error or nothing changed if(rule_num==0&&update_type==CM_UPDATE_TYPE_INC)//error or nothing changed
{ {
return; return;
} }
if(rule_num>0) if(rule_num>0)
{ {
ret=get_maat_redis_value(c,rule_list,rule_num, logger,0); ret=get_maat_redis_value(mr_ctx->read_ctx, rule_list, rule_num, logger, 0);
if(ret<0) if(ret<0)
{ {
MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update."); MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update.");
@@ -1778,10 +1803,10 @@ void redis_monitor_traverse(long long version,redisContext *c,
{ {
MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,"%d of %d rules are empty.",empty_value_num,rule_num); MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,"%d of %d rules are empty.",empty_value_num,rule_num);
} }
ret=get_foreign_keys_define(mr_ctx->read_ctx, rule_list, rule_num, feather, feather->foreign_cont_dir, logger); ret=get_foreign_keys_define(mr_ctx->read_ctx, rule_list, rule_num, feather, feather->foreign_cont_dir, logger);
if(ret>0) if(ret>0)
{ {
get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0, logger); get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0, logger);
} }
} }
@@ -1990,15 +2015,12 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru
struct _Maat_table_info_t* p_table=NULL; struct _Maat_table_info_t* p_table=NULL;
long long server_time=0,absolute_expire_time=0; long long server_time=0,absolute_expire_time=0;
const char* p_foreign=NULL; const char* p_foreign=NULL;
int foreign_key_size=0; int foreign_key_size=0;
if(_feather->redis_write_ctx==NULL) redisContext* write_ctx=get_redis_ctx_for_write(_feather);
{
ret=connect_redis_for_write(_feather);
if(write_ctx==NULL) if(write_ctx==NULL)
{ {
return -1; return -1;
} }
}
server_time=redis_server_time(write_ctx); server_time=redis_server_time(write_ctx);
s_rule=(struct serial_rule_t *)calloc(sizeof(struct serial_rule_t),line_num); s_rule=(struct serial_rule_t *)calloc(sizeof(struct serial_rule_t),line_num);
for(i=0;i<line_num;i++) for(i=0;i<line_num;i++)
@@ -2072,7 +2094,7 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru
} }
set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id,line_rule[i]->table_name,line_rule[i]->table_line, absolute_expire_time); set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id,line_rule[i]->table_name,line_rule[i]->table_line, absolute_expire_time);
} }
success_cnt=exec_serial_rule(write_ctx,s_rule, line_num,server_time,_feather->logger); success_cnt=exec_serial_rule(write_ctx,s_rule, line_num,server_time,_feather->logger);
if(success_cnt<0||success_cnt!=line_num)//error if(success_cnt<0||success_cnt!=line_num)//error
{ {
@@ -2099,7 +2121,7 @@ int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_line_t* line_rule
} }
int Maat_cmd_set_file(Maat_feather_t feather,const char* key, const char* value, size_t size, enum MAAT_OPERATION op) int Maat_cmd_set_file(Maat_feather_t feather,const char* key, const char* value, size_t size, enum MAAT_OPERATION op)
{ {
struct _Maat_feather_t* _feather=(struct _Maat_feather_t*)feather; struct _Maat_feather_t* _feather=(struct _Maat_feather_t*)feather;
redisContext* ctx=_feather->mr_ctx.write_ctx; redisContext* ctx=_feather->mr_ctx.write_ctx;
const char *arg_vec[3]; const char *arg_vec[3];
size_t len_vec[3]; size_t len_vec[3];
@@ -2257,7 +2279,7 @@ int Maat_cmd_commit(Maat_feather_t feather)
int serial_rule_num=0,serial_rule_idx=0; int serial_rule_num=0,serial_rule_idx=0;
UNUSED int transection_success=1; UNUSED int transection_success=1;
struct _Maat_cmd_inner_t* p=NULL,*n=NULL; struct _Maat_cmd_inner_t* p=NULL,*n=NULL;
redisContext* write_ctx=NULL; redisContext* write_ctx=NULL;
redisReply* data_reply=NULL; redisReply* data_reply=NULL;
@@ -2269,26 +2291,22 @@ int Maat_cmd_commit(Maat_feather_t feather)
if(_feather->cmd_q_cnt==0) if(_feather->cmd_q_cnt==0)
{ {
return 0; return 0;
} }
write_ctx=get_redis_ctx_for_write(_feather);
if(write_ctx==NULL) if(write_ctx==NULL)
{
ret=connect_redis_for_write(_feather);
if(ret!=0)
{
{ {
goto error_out;
}
}
} }
for(i=0,p=_feather->cmd_qhead;i<_feather->cmd_q_cnt;i++) for(i=0,p=_feather->cmd_qhead;i<_feather->cmd_q_cnt;i++)
{ {
serial_rule_num+=calculate_serial_rule_num(p, &new_region_num, &new_group_num); serial_rule_num+=calculate_serial_rule_num(p, &new_region_num, &new_group_num);
p=p->next; p=p->next;
} }
_feather->server_time=redis_server_time(write_ctx); _feather->server_time=redis_server_time(write_ctx);
if(_feather->AUTO_NUMBERING_ON==1) if(_feather->AUTO_NUMBERING_ON==1)
{ {
data_reply=_wrap_redisCommand(write_ctx,"INCRBY SEQUENCE_REGION %d",new_region_num); data_reply=_wrap_redisCommand(write_ctx,"INCRBY SEQUENCE_REGION %d",new_region_num);
if(data_reply->type!=REDIS_REPLY_INTEGER) if(data_reply->type!=REDIS_REPLY_INTEGER)
{ {
@@ -2298,7 +2316,7 @@ int Maat_cmd_commit(Maat_feather_t feather)
} }
_feather->base_rgn_seq=data_reply->integer-new_region_num; _feather->base_rgn_seq=data_reply->integer-new_region_num;
freeReplyObject(data_reply); freeReplyObject(data_reply);
data_reply=_wrap_redisCommand(write_ctx,"INCRBY SEQUENCE_GROUP %d",new_group_num); data_reply=_wrap_redisCommand(write_ctx,"INCRBY SEQUENCE_GROUP %d",new_group_num);
if(data_reply->type!=REDIS_REPLY_INTEGER) if(data_reply->type!=REDIS_REPLY_INTEGER)
{ {
@@ -2317,7 +2335,7 @@ int Maat_cmd_commit(Maat_feather_t feather)
p=p->next; p=p->next;
} }
assert(serial_rule_idx==serial_rule_num); assert(serial_rule_idx==serial_rule_num);
transection_success=0; transection_success=0;
transection_success=exec_serial_rule(write_ctx, s_rule,serial_rule_num,_feather->server_time,_feather->logger); transection_success=exec_serial_rule(write_ctx, s_rule,serial_rule_num,_feather->server_time,_feather->logger);
assert(transection_success==serial_rule_num); assert(transection_success==serial_rule_num);
ret=_feather->cmd_q_cnt; ret=_feather->cmd_q_cnt;
@@ -2344,16 +2362,13 @@ long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment)
{ {
_Maat_feather_t* _feather=(_Maat_feather_t*)feather; _Maat_feather_t* _feather=(_Maat_feather_t*)feather;
redisReply* data_reply=NULL; redisReply* data_reply=NULL;
long long result=0; long long result=0;
int ret=0; redisContext* write_ctx=get_redis_ctx_for_write(_feather);
if(_feather->redis_write_ctx==NULL)
{
ret=connect_redis_for_write(_feather);
if(write_ctx==NULL) if(write_ctx==NULL)
{ {
return -1; return -1;
} }
}
data_reply=_wrap_redisCommand(write_ctx,"INCRBY %s %d", key, increment); data_reply=_wrap_redisCommand(write_ctx,"INCRBY %s %d", key, increment);
if(data_reply->type==REDIS_REPLY_INTEGER) if(data_reply->type==REDIS_REPLY_INTEGER)
{ {
@@ -2370,16 +2385,13 @@ int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, unsi
{ {
_Maat_feather_t* _feather=(_Maat_feather_t*)feather; _Maat_feather_t* _feather=(_Maat_feather_t*)feather;
redisReply* data_reply=NULL; redisReply* data_reply=NULL;
unsigned int i=0; unsigned int i=0;
int ret=0; redisContext* write_ctx=get_redis_ctx_for_write(_feather);
if(_feather->redis_write_ctx==NULL)
{
ret=connect_redis_for_write(_feather);
if(write_ctx==NULL) if(write_ctx==NULL)
{ {
return -1; return -1;
} }
}
data_reply=_wrap_redisCommand(write_ctx,"ZRANGEBYSCORE %s %d %d" data_reply=_wrap_redisCommand(write_ctx,"ZRANGEBYSCORE %s %d %d"
,rm_label_sset ,rm_label_sset
,label_id ,label_id
@@ -2451,17 +2463,14 @@ int Maat_cmd_flushDB(Maat_feather_t feather)
int Maat_cmd_flushDB(Maat_feather_t feather) int Maat_cmd_flushDB(Maat_feather_t feather)
{ {
_Maat_feather_t* _feather=(_Maat_feather_t*)feather; _Maat_feather_t* _feather=(_Maat_feather_t*)feather;
int ret=0; int ret=0;
if(_feather->redis_write_ctx==NULL) redisContext* write_ctx=get_redis_ctx_for_write(_feather);
{
ret=connect_redis_for_write(_feather);
if(write_ctx==NULL) if(write_ctx==NULL)
{ {
return -1; return -1;
}
} }
do do
{ {
ret=redis_flush_DB(_feather->mr_ctx.write_ctx, _feather->mr_ctx.redis_db, _feather->logger); ret=redis_flush_DB(_feather->mr_ctx.write_ctx, _feather->mr_ctx.redis_db, _feather->logger);
}while(ret==0); }while(ret==0);
return 0; return 0;

View File

@@ -3473,7 +3473,7 @@ void *thread_rule_monitor(void *arg)
if(feather->REDIS_MODE_ON==1) if(feather->REDIS_MODE_ON==1)
{ {
redis_monitor_traverse(feather->maat_version redis_monitor_traverse(feather->maat_version
,feather->redis_read_ctx ,&(feather->mr_ctx)
,maat_start_cb ,maat_start_cb
,maat_update_cb ,maat_update_cb
,maat_finish_cb ,maat_finish_cb
@@ -3567,14 +3567,18 @@ void *thread_rule_monitor(void *arg)
alignment_int64_array_free(feather->hit_cnt); alignment_int64_array_free(feather->hit_cnt);
alignment_int64_array_free(feather->orphan_group_saving); alignment_int64_array_free(feather->orphan_group_saving);
alignment_int64_array_free(feather->last_region_saving); alignment_int64_array_free(feather->last_region_saving);
if(feather->REDIS_MODE_ON==1&&feather->redis_read_ctx!=NULL) if(feather->REDIS_MODE_ON==1)
{ {
pthread_mutex_lock(&(feather->redis_write_lock)); if(feather->mr_ctx.read_ctx)
redisFree(feather->redis_read_ctx); {
feather->redis_read_ctx=NULL; redisFree(feather->mr_ctx.read_ctx);
redisFree(feather->redis_write_ctx); feather->mr_ctx.read_ctx=NULL;
feather->redis_write_ctx=NULL; }
pthread_mutex_unlock(&(feather->redis_write_lock)); if(feather->mr_ctx.write_ctx)
{
redisFree(feather->mr_ctx.write_ctx);
feather->mr_ctx.write_ctx=NULL;
}
} }
for(i=0; i<feather->n_tags; i++) for(i=0; i<feather->n_tags; i++)
{ {

View File

@@ -359,6 +359,16 @@ struct rule_tag
char* tag_name; char* tag_name;
char* tag_val; char* tag_val;
}; };
struct maat_redis_ctx
{
redisContext *read_ctx;
redisContext *write_ctx;
char redis_ip[64];
int redis_port;
int redis_db;
time_t last_reconnect_time;
};
struct _Maat_scanner_t struct _Maat_scanner_t
{ {
long long version; long long version;
@@ -416,19 +426,14 @@ struct _Maat_feather_t
pthread_mutex_t backgroud_update_mutex; pthread_mutex_t backgroud_update_mutex;
unsigned char decrypt_key[MAX_TABLE_NAME_LEN]; unsigned char decrypt_key[MAX_TABLE_NAME_LEN];
pthread_t cfg_mon_t; pthread_t cfg_mon_t;
struct maat_redis_ctx mr_ctx;
char redis_ip[MAX_TABLE_NAME_LEN];
int redis_port;
int redis_index;
int AUTO_NUMBERING_ON; int AUTO_NUMBERING_ON;
struct timeval connect_timeout;
redisContext *redis_read_ctx; // redisContext *redis_write_ctx; // not thread safe.
time_t last_reconnect_time;
redisContext *redis_write_ctx; // not thread safe.
int on_redis_writing;
int cmd_q_cnt; int cmd_q_cnt;
struct _Maat_cmd_inner_t* cmd_qhead, *cmd_qtail; struct _Maat_cmd_inner_t* cmd_qhead, *cmd_qtail;
pthread_mutex_t redis_write_lock; //protect redis_write_ctx
long long base_rgn_seq,base_grp_seq,server_time; long long base_rgn_seq,base_grp_seq,server_time;
long long load_version_from; long long load_version_from;
@@ -536,8 +541,9 @@ void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op,int rule_
void empty_serial_rules(struct serial_rule_t* rule); void empty_serial_rules(struct serial_rule_t* rule);
int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger); int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger);
long long redis_server_time(redisContext* ctx); long long redis_server_time(redisContext* ctx);
redisContext * connect_redis(const char*redis_ip, int redis_port, int redis_db, void* logger);
void redis_monitor_traverse(long long version,redisContext *c, void redis_monitor_traverse(long long version, struct maat_redis_ctx* mr_ctx,
void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para
int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para
void (*finish)(void*),//u_para void (*finish)(void*),//u_para

View File

@@ -38,26 +38,7 @@ static int compare_serial_rule(const void *a, const void *b)
snprintf(q_str,sizeof(q_str),"%s.%ld",rb->table_name,rb->rule_id); snprintf(q_str,sizeof(q_str),"%s.%ld",rb->table_name,rb->rule_id);
return strcmp(p_str,q_str); return strcmp(p_str,q_str);
} }
static redisContext * connect_redis(const char*redis_ip, int redis_port, int redis_db)
{
struct timeval connect_timeout;
connect_timeout.tv_sec=0;
connect_timeout.tv_usec=100*1000; // 100 ms
redisReply* reply=NULL;
redisContext * ctx;
ctx=redisConnectWithTimeout(redis_ip, redis_port,connect_timeout);
if(ctx==NULL||ctx->err)
{
printf("Unable to connect %s:%d db%d : %s\n",redis_ip,redis_port,redis_db, ctx==NULL?"Unknown":ctx->errstr);
return NULL;
}
reply=_wrap_redisCommand(ctx, "select %d",redis_db);
freeReplyObject(reply);
return ctx;
}
void read_rule_from_redis(redisContext * ctx, long long desire_version, const char* output_path ,void*logger) void read_rule_from_redis(redisContext * ctx, long long desire_version, const char* output_path ,void*logger)
{ {
struct serial_rule_t* rule_list; struct serial_rule_t* rule_list;
@@ -333,7 +314,7 @@ int main(int argc, char * argv[])
break; break;
} }
} }
ctx=connect_redis(redis_ip,redis_port, redis_db); ctx=connect_redis(redis_ip,redis_port, redis_db, NULL);
if(ctx==NULL) if(ctx==NULL)
{ {
return -1; return -1;