diff --git a/src/entry/Maat_api.cpp b/src/entry/Maat_api.cpp index 8644ca5..73e4b8c 100644 --- a/src/entry/Maat_api.cpp +++ b/src/entry/Maat_api.cpp @@ -442,7 +442,7 @@ Maat_feather_t Maat_feather(int max_thread_num,const char* table_info_path,void* ,max_thread_num); return NULL; } - _Maat_feather_t* feather=(_Maat_feather_t*)calloc(sizeof(struct _Maat_feather_t),1); + _Maat_feather_t* feather=ALLOC(struct _Maat_feather_t, 1); feather->table_cnt=read_table_info(feather->p_table_info, MAX_TABLE_NUM,table_info_path,max_thread_num,logger); feather->map_tablename2id=map_create(); int i=0,j=0,ret=0; @@ -489,15 +489,11 @@ Maat_feather_t Maat_feather(int max_thread_num,const char* table_info_path,void* feather->last_full_version=0; feather->base_grp_seq=0; feather->base_rgn_seq=0; - feather->redis_index=0; feather->AUTO_NUMBERING_ON=1; - feather->connect_timeout.tv_sec=0; - feather->connect_timeout.tv_usec=100*1000; // 100 ms feather->backgroud_update_enabled=1; feather->foreign_cont_linger=0; snprintf(feather->foreign_cont_dir, sizeof(feather->foreign_cont_dir), "%s_files", table_info_path); pthread_mutex_init(&(feather->backgroud_update_mutex),NULL); - pthread_mutex_init(&(feather->redis_write_lock),NULL); snprintf(feather->table_info_fn,sizeof(feather->table_info_fn),"%s",table_info_path); return feather; } @@ -627,20 +623,20 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo memcpy(_feather->decrypt_key,value,size); break; case MAAT_OPT_REDIS_IP: - if((size_t)size>sizeof(_feather->redis_ip)) + if((size_t)size>sizeof(_feather->mr_ctx.redis_ip)) { return -1; } - memcpy(_feather->redis_ip,value,size); + memcpy(_feather->mr_ctx.redis_ip,value,size); break; case MAAT_OPT_REDIS_PORT: if((size_t)size==sizeof(unsigned short)) { - _feather->redis_port=*((unsigned short*)value); + _feather->mr_ctx.redis_port=*((unsigned short*)value); } else if((size_t)size==sizeof(int)) { - _feather->redis_port=*((int*)value); + _feather->mr_ctx.redis_port=*((int*)value); } else { @@ -652,7 +648,7 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo { return -1; } - _feather->redis_index=*((int*)value); + _feather->mr_ctx.redis_db=*((int*)value); break; case MAAT_OPT_CMD_AUTO_NUMBERING: if((size_t)size!=sizeof(int)||*((int*)value)>15||*((int*)value)<0) @@ -700,34 +696,20 @@ int Maat_set_feather_opt(Maat_feather_t feather,enum MAAT_INIT_OPT type,const vo } void maat_read_full_config(_Maat_feather_t* _feather) { - redisReply* reply=NULL; - if(strlen(_feather->redis_ip)>0&&_feather->redis_port!=0) + struct maat_redis_ctx* mr_ctx=&(_feather->mr_ctx); + if(strlen(mr_ctx->redis_ip)>0&&mr_ctx->redis_port!=0) { _feather->REDIS_MODE_ON=1; MESA_handle_runtime_log(_feather->logger,RLOG_LV_INFO,maat_module , "Maat initiate from Redis %s:%hu db%d." - ,_feather->redis_ip - ,_feather->redis_port - ,_feather->redis_index); - _feather->redis_read_ctx=redisConnectWithTimeout(_feather->redis_ip,_feather->redis_port,_feather->connect_timeout); - if(_feather->redis_read_ctx==NULL||_feather->redis_read_ctx->err) + ,mr_ctx->redis_ip + ,mr_ctx->redis_port + ,mr_ctx->redis_db); + mr_ctx->read_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, _feather->logger); + if(mr_ctx->read_ctx != NULL) { - MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module - ,"Redis connect %s:%d failed : %s." - ,_feather->redis_ip,_feather->redis_port,_feather->redis_read_ctx==NULL?"Unkonwn":_feather->redis_read_ctx->errstr); - if(_feather->redis_read_ctx!=NULL) - { - redisFree(_feather->redis_write_ctx); - _feather->redis_write_ctx=NULL; - } - } - else - { - redisEnableKeepAlive(_feather->redis_read_ctx); - reply=_wrap_redisCommand(_feather->redis_read_ctx, "select %d",_feather->redis_index); - freeReplyObject(reply); redis_monitor_traverse(_feather->maat_version - ,_feather->redis_read_ctx + ,mr_ctx ,maat_start_cb ,maat_update_cb ,maat_finish_cb @@ -760,8 +742,8 @@ void maat_read_full_config(_Maat_feather_t* _feather) { MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_module , "At initiation: no avilable rule in redis in %s:%hu" - ,_feather->redis_ip - ,_feather->redis_port); + ,mr_ctx->redis_ip + ,mr_ctx->redis_port); } else diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index 740776e..1438527 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -58,30 +58,65 @@ redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...) } return (redisReply *)reply; } -int connect_redis_for_write(_Maat_feather_t * feather) +redisContext * connect_redis(const char*redis_ip, int redis_port, int redis_db, void* logger) { - int ret=0; + struct timeval connect_timeout; + connect_timeout.tv_sec=0; + connect_timeout.tv_usec=100*1000; // 100 ms redisReply* reply=NULL; - assert(feather->redis_write_ctx==NULL); - feather->redis_write_ctx=redisConnectWithTimeout(feather->redis_ip, feather->redis_port,feather->connect_timeout); - if(feather->redis_write_ctx==NULL||feather->redis_write_ctx->err) + + redisContext * ctx; + ctx=redisConnectWithTimeout(redis_ip, redis_port,connect_timeout); + if(ctx==NULL||ctx->err) { - MESA_handle_runtime_log(feather->logger,RLOG_LV_FATAL,maat_module - ,"Redis connect %s:%d for write failed: %s." - ,feather->redis_ip,feather->redis_port,feather->redis_write_ctx==NULL?"Unkown":feather->redis_write_ctx->errstr); - if(feather->redis_write_ctx!=NULL) + if(logger==NULL) { - redisFree(feather->redis_write_ctx); - feather->redis_write_ctx=NULL; + printf("Unable to connect redis server %s:%d db%d, error: %s\n", + redis_ip, redis_port, redis_db, ctx==NULL ? "Unknown" : ctx->errstr); + } - ret=-1; + else + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Unable to connect redis server %s:%d db%d, error: %s", + redis_ip, redis_port, redis_db, ctx==NULL ? "Unknown" : ctx->errstr); + } + if(ctx!=NULL) redisFree(ctx); + return NULL; + } + redisEnableKeepAlive(ctx); + reply=_wrap_redisCommand(ctx, "select %d",redis_db); + freeReplyObject(reply); + + return ctx; + +} + +int connect_redis_for_write(struct maat_redis_ctx* mr_ctx, void* logger) +{ + assert(mr_ctx->write_ctx==NULL); + mr_ctx->write_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, logger); + if(mr_ctx->write_ctx==NULL) + { + return -1; } else { - reply=_wrap_redisCommand(feather->redis_write_ctx, "select %d",feather->redis_index); - freeReplyObject(reply); + return 0; } - return ret; +} +redisContext* get_redis_ctx_for_write(_Maat_feather_t * feather) +{ + int ret=0; + if(feather->mr_ctx.write_ctx==NULL) + { + ret=connect_redis_for_write(&(feather->mr_ctx), feather->logger); + if(ret!=0) + { + return NULL; + } + } + return feather->mr_ctx.write_ctx; } long long read_redis_integer(const redisReply* reply) { @@ -319,22 +354,6 @@ void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op,int rule_ } return; } -int _wrap_redisReconnect(redisContext* c, void*logger) -{ - int ret=0; - ret=redisReconnect(c); - if(ret==REDIS_OK) - { - - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Reconnect success."); - return 0; - } - else - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"Reconnect failed."); - return -1; - } -} int get_inc_key_list(long long instance_version, long long target_version, redisContext *c, struct serial_rule_t** list,void* logger) { redisReply* reply=NULL,*tmp_reply=NULL; @@ -520,11 +539,10 @@ int get_rm_key_list(redisContext *c, long long instance_version, long long desir } else { - memset(err_buff,0,sizeof(err_buff)); - __redis_strerror_r(errno,err_buff,sizeof(err_buff)-1); + memset(err_buff, 0, sizeof(err_buff)); + __redis_strerror_r(errno, err_buff, sizeof(err_buff)-1); MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "GET MAAT_VERSION failed %s. Reconnecting...",err_buff); - _wrap_redisReconnect(c,logger); + "GET MAAT_VERSION failed %s.",err_buff); return -1; } redis_version=read_redis_integer(reply); @@ -1709,7 +1727,8 @@ void get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int r } return; } -void redis_monitor_traverse(long long version,redisContext *c, + +void redis_monitor_traverse(long long version, struct maat_redis_ctx* mr_ctx, void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para void (*finish)(void*),//u_para @@ -1724,39 +1743,49 @@ void redis_monitor_traverse(long long version,redisContext *c, long long new_version=0; enum MAAT_TABLE_TYPE table_type; void* logger=feather->logger; - if(feather->redis_write_ctx!=NULL&&feather->redis_write_ctx->err==0)//authorized to write + + if(mr_ctx->write_ctx!=NULL&&mr_ctx->write_ctx->err==0)//authorized to write { //For thread safe, deliberately use redis_read_ctx but not redis_write_ctx. - if(1==redlock_try_lock(feather->redis_read_ctx, rm_expire_lock, rm_expire_lock_timeout)) + if(1==redlock_try_lock(mr_ctx->read_ctx, rm_expire_lock, rm_expire_lock_timeout)) { - check_maat_expiration(feather->redis_read_ctx, logger); - cleanup_update_status(feather->redis_read_ctx, logger); - redlock_unlock(feather->redis_read_ctx,rm_expire_lock); + check_maat_expiration(mr_ctx->read_ctx, logger); + cleanup_update_status(mr_ctx->read_ctx, logger); + redlock_unlock(mr_ctx->read_ctx, rm_expire_lock); } } - if(c==NULL) return; - if(c->err) + if(mr_ctx->read_ctx==NULL||mr_ctx->read_ctx->err) { - if(time(NULL)-feather->last_reconnect_time < MAAT_REDIS_RECONNECT_INTERVAL) + if(time(NULL)-mr_ctx->last_reconnect_time < MAAT_REDIS_RECONNECT_INTERVAL) { return; } - feather->last_reconnect_time=time(NULL); - if(0!=_wrap_redisReconnect(c, logger)) + mr_ctx->last_reconnect_time=time(NULL); + if(mr_ctx->read_ctx!=NULL) + { + redisFree(mr_ctx->read_ctx); + } + MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, "Reconnecting..."); + mr_ctx->read_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, feather->logger); + if(mr_ctx->read_ctx==NULL) { return; } } - rule_num=get_rm_key_list(c, version, feather->load_version_from, &new_version, &rule_list, &update_type, logger, feather->cumulative_update_off); + rule_num=get_rm_key_list(mr_ctx->read_ctx, version, feather->load_version_from, &new_version, &rule_list, &update_type, logger, feather->cumulative_update_off); + if(rule_num<0)//redis communication error + { + return; + } feather->load_version_from=0;//only valid for one time. - if(rule_num<0||(rule_num==0&&update_type==CM_UPDATE_TYPE_INC))//error or nothing changed + if(rule_num==0&&update_type==CM_UPDATE_TYPE_INC)//error or nothing changed { return; } if(rule_num>0) { - ret=get_maat_redis_value(c,rule_list,rule_num, logger,0); + ret=get_maat_redis_value(mr_ctx->read_ctx, rule_list, rule_num, logger, 0); if(ret<0) { MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update."); @@ -1778,10 +1807,10 @@ void redis_monitor_traverse(long long version,redisContext *c, { MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,"%d of %d rules are empty.",empty_value_num,rule_num); } - ret=get_foreign_keys_define(c, rule_list, rule_num, feather, feather->foreign_cont_dir, logger); + ret=get_foreign_keys_define(mr_ctx->read_ctx, rule_list, rule_num, feather, feather->foreign_cont_dir, logger); if(ret>0) { - get_foreign_conts(c, rule_list, rule_num, 0, logger); + get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0, logger); } } start(new_version,update_type,u_para); @@ -1990,15 +2019,12 @@ int Maat_cmd_set_lines(Maat_feather_t feather,const struct Maat_line_t** line_ru long long server_time=0,absolute_expire_time=0; const char* p_foreign=NULL; int foreign_key_size=0; - if(_feather->redis_write_ctx==NULL) + redisContext* write_ctx=get_redis_ctx_for_write(_feather); + if(write_ctx==NULL) { - ret=connect_redis_for_write(_feather); - if(ret!=0) - { - return -1; - } + return -1; } - server_time=redis_server_time(_feather->redis_write_ctx); + server_time=redis_server_time(write_ctx); s_rule=(struct serial_rule_t *)calloc(sizeof(struct serial_rule_t),line_num); for(i=0;irule_id,line_rule[i]->label_id,line_rule[i]->table_name,line_rule[i]->table_line, absolute_expire_time); } - success_cnt=exec_serial_rule(_feather->redis_write_ctx,s_rule, line_num,server_time,_feather->logger); + success_cnt=exec_serial_rule(write_ctx,s_rule, line_num,server_time,_feather->logger); if(success_cnt<0||success_cnt!=line_num)//error { ret=-1; @@ -2099,7 +2125,7 @@ int Maat_cmd_set_line(Maat_feather_t feather,const struct Maat_line_t* line_rule int Maat_cmd_set_file(Maat_feather_t feather,const char* key, const char* value, size_t size, enum MAAT_OPERATION op) { struct _Maat_feather_t* _feather=(struct _Maat_feather_t*)feather; - redisContext* ctx=_feather->redis_write_ctx; + redisContext* ctx=_feather->mr_ctx.write_ctx; const char *arg_vec[3]; size_t len_vec[3]; arg_vec[0] = "SET"; @@ -2257,7 +2283,7 @@ int Maat_cmd_commit(Maat_feather_t feather) UNUSED int transection_success=1; struct _Maat_cmd_inner_t* p=NULL,*n=NULL; - redisContext* ctx=NULL; + redisContext* write_ctx=NULL; redisReply* data_reply=NULL; struct serial_rule_t* s_rule=NULL; @@ -2269,26 +2295,22 @@ int Maat_cmd_commit(Maat_feather_t feather) { return 0; } - if(_feather->redis_write_ctx==NULL) + write_ctx=get_redis_ctx_for_write(_feather); + if(write_ctx==NULL) { - ret=connect_redis_for_write(_feather); - if(ret!=0) - { - ret=-1; - goto error_out; - } + goto error_out; + } - ctx=_feather->redis_write_ctx; for(i=0,p=_feather->cmd_qhead;i<_feather->cmd_q_cnt;i++) { serial_rule_num+=calculate_serial_rule_num(p, &new_region_num, &new_group_num); p=p->next; } - _feather->server_time=redis_server_time(ctx); + _feather->server_time=redis_server_time(write_ctx); if(_feather->AUTO_NUMBERING_ON==1) { - data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_REGION %d",new_region_num); + data_reply=_wrap_redisCommand(write_ctx,"INCRBY SEQUENCE_REGION %d",new_region_num); if(data_reply->type!=REDIS_REPLY_INTEGER) { freeReplyObject(data_reply); @@ -2298,7 +2320,7 @@ int Maat_cmd_commit(Maat_feather_t feather) _feather->base_rgn_seq=data_reply->integer-new_region_num; freeReplyObject(data_reply); - data_reply=_wrap_redisCommand(ctx,"INCRBY SEQUENCE_GROUP %d",new_group_num); + data_reply=_wrap_redisCommand(write_ctx,"INCRBY SEQUENCE_GROUP %d",new_group_num); if(data_reply->type!=REDIS_REPLY_INTEGER) { freeReplyObject(data_reply); @@ -2317,7 +2339,7 @@ int Maat_cmd_commit(Maat_feather_t feather) } assert(serial_rule_idx==serial_rule_num); transection_success=0; - transection_success=exec_serial_rule(ctx, s_rule,serial_rule_num,_feather->server_time,_feather->logger); + transection_success=exec_serial_rule(write_ctx, s_rule,serial_rule_num,_feather->server_time,_feather->logger); assert(transection_success==serial_rule_num); ret=_feather->cmd_q_cnt; _feather->cmd_acc_num+=_feather->cmd_q_cnt; @@ -2344,16 +2366,13 @@ long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment) _Maat_feather_t* _feather=(_Maat_feather_t*)feather; redisReply* data_reply=NULL; long long result=0; - int ret=0; - if(_feather->redis_write_ctx==NULL) + redisContext* write_ctx=get_redis_ctx_for_write(_feather); + + if(write_ctx==NULL) { - ret=connect_redis_for_write(_feather); - if(ret!=0) - { - return -1; - } + return -1; } - data_reply=_wrap_redisCommand(_feather->redis_write_ctx,"INCRBY %s %d", key, increment); + data_reply=_wrap_redisCommand(write_ctx,"INCRBY %s %d", key, increment); if(data_reply->type==REDIS_REPLY_INTEGER) { result=data_reply->integer; @@ -2370,16 +2389,13 @@ int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, unsi _Maat_feather_t* _feather=(_Maat_feather_t*)feather; redisReply* data_reply=NULL; unsigned int i=0; - int ret=0; - if(_feather->redis_write_ctx==NULL) + redisContext* write_ctx=get_redis_ctx_for_write(_feather); + if(write_ctx==NULL) { - ret=connect_redis_for_write(_feather); - if(ret!=0) - { - return -1; - } + return -1; } - data_reply=_wrap_redisCommand(_feather->redis_write_ctx,"ZRANGEBYSCORE %s %d %d" + + data_reply=_wrap_redisCommand(write_ctx,"ZRANGEBYSCORE %s %d %d" ,rm_label_sset ,label_id ,label_id); @@ -2451,17 +2467,14 @@ int Maat_cmd_flushDB(Maat_feather_t feather) { _Maat_feather_t* _feather=(_Maat_feather_t*)feather; int ret=0; - if(_feather->redis_write_ctx==NULL) + redisContext* write_ctx=get_redis_ctx_for_write(_feather); + if(write_ctx==NULL) { - ret=connect_redis_for_write(_feather); - if(ret!=0) - { - return -1; - } + return -1; } do { - ret=redis_flush_DB(_feather->redis_write_ctx, _feather->redis_index, _feather->logger); + ret=redis_flush_DB(_feather->mr_ctx.write_ctx, _feather->mr_ctx.redis_db, _feather->logger); }while(ret==0); return 0; } diff --git a/src/entry/Maat_rule.cpp b/src/entry/Maat_rule.cpp index 9def258..dd8d0cf 100644 --- a/src/entry/Maat_rule.cpp +++ b/src/entry/Maat_rule.cpp @@ -32,7 +32,7 @@ #include "stream_fuzzy_hash.h" #include "gram_index_engine.h" -int MAAT_FRAME_VERSION_2_4_20181126=1; +int MAAT_FRAME_VERSION_2_4_20181127=1; const char* CHARSET_STRING[]={"NONE","gbk","big5","unicode","utf8","bin", "unicode_ascii_esc","unicode_ascii_aligned","unicode_ncr_dec","unicode_ncr_hex","url_encode_gb2312","url_encode_utf8",""}; @@ -3473,7 +3473,7 @@ void *thread_rule_monitor(void *arg) if(feather->REDIS_MODE_ON==1) { redis_monitor_traverse(feather->maat_version - ,feather->redis_read_ctx + ,&(feather->mr_ctx) ,maat_start_cb ,maat_update_cb ,maat_finish_cb @@ -3567,14 +3567,18 @@ void *thread_rule_monitor(void *arg) alignment_int64_array_free(feather->hit_cnt); alignment_int64_array_free(feather->orphan_group_saving); alignment_int64_array_free(feather->last_region_saving); - if(feather->REDIS_MODE_ON==1&&feather->redis_read_ctx!=NULL) + if(feather->REDIS_MODE_ON==1) { - pthread_mutex_lock(&(feather->redis_write_lock)); - redisFree(feather->redis_read_ctx); - feather->redis_read_ctx=NULL; - redisFree(feather->redis_write_ctx); - feather->redis_write_ctx=NULL; - pthread_mutex_unlock(&(feather->redis_write_lock)); + if(feather->mr_ctx.read_ctx) + { + redisFree(feather->mr_ctx.read_ctx); + feather->mr_ctx.read_ctx=NULL; + } + if(feather->mr_ctx.write_ctx) + { + redisFree(feather->mr_ctx.write_ctx); + feather->mr_ctx.write_ctx=NULL; + } } for(i=0; in_tags; i++) { diff --git a/src/inc_internal/Maat_rule_internal.h b/src/inc_internal/Maat_rule_internal.h index c266116..a20affc 100644 --- a/src/inc_internal/Maat_rule_internal.h +++ b/src/inc_internal/Maat_rule_internal.h @@ -359,6 +359,16 @@ struct rule_tag char* tag_name; char* tag_val; }; +struct maat_redis_ctx +{ + redisContext *read_ctx; + redisContext *write_ctx; + char redis_ip[64]; + int redis_port; + int redis_db; + time_t last_reconnect_time; +}; + struct _Maat_scanner_t { long long version; @@ -416,19 +426,14 @@ struct _Maat_feather_t pthread_mutex_t backgroud_update_mutex; unsigned char decrypt_key[MAX_TABLE_NAME_LEN]; pthread_t cfg_mon_t; + struct maat_redis_ctx mr_ctx; - char redis_ip[MAX_TABLE_NAME_LEN]; - int redis_port; - int redis_index; int AUTO_NUMBERING_ON; - struct timeval connect_timeout; - redisContext *redis_read_ctx; - time_t last_reconnect_time; - redisContext *redis_write_ctx; // not thread safe. - int on_redis_writing; + +// redisContext *redis_write_ctx; // not thread safe. + int cmd_q_cnt; struct _Maat_cmd_inner_t* cmd_qhead, *cmd_qtail; - pthread_mutex_t redis_write_lock; //protect redis_write_ctx long long base_rgn_seq,base_grp_seq,server_time; long long load_version_from; @@ -536,8 +541,9 @@ void set_serial_rule(struct serial_rule_t* rule,enum MAAT_OPERATION op,int rule_ void empty_serial_rules(struct serial_rule_t* rule); int exec_serial_rule(redisContext* ctx,struct serial_rule_t* s_rule,int serial_rule_num, long long server_time, void* logger); long long redis_server_time(redisContext* ctx); +redisContext * connect_redis(const char*redis_ip, int redis_port, int redis_db, void* logger); -void redis_monitor_traverse(long long version,redisContext *c, +void redis_monitor_traverse(long long version, struct maat_redis_ctx* mr_ctx, void (*start)(long long,int ,void*),//vesion,CM_UPDATE_TYPE_*,u_para int (*update)(const char* ,const char*,void* ),//table name ,line ,u_para void (*finish)(void*),//u_para diff --git a/tools/maat_redis_tool.cpp b/tools/maat_redis_tool.cpp index f9340ce..55b1913 100644 --- a/tools/maat_redis_tool.cpp +++ b/tools/maat_redis_tool.cpp @@ -38,26 +38,7 @@ static int compare_serial_rule(const void *a, const void *b) snprintf(q_str,sizeof(q_str),"%s.%ld",rb->table_name,rb->rule_id); return strcmp(p_str,q_str); } -static redisContext * connect_redis(const char*redis_ip, int redis_port, int redis_db) -{ - struct timeval connect_timeout; - connect_timeout.tv_sec=0; - connect_timeout.tv_usec=100*1000; // 100 ms - redisReply* reply=NULL; - redisContext * ctx; - ctx=redisConnectWithTimeout(redis_ip, redis_port,connect_timeout); - if(ctx==NULL||ctx->err) - { - printf("Unable to connect %s:%d db%d : %s\n",redis_ip,redis_port,redis_db, ctx==NULL?"Unknown":ctx->errstr); - return NULL; - } - reply=_wrap_redisCommand(ctx, "select %d",redis_db); - freeReplyObject(reply); - - return ctx; - -} void read_rule_from_redis(redisContext * ctx, long long desire_version, const char* output_path ,void*logger) { struct serial_rule_t* rule_list; @@ -333,7 +314,7 @@ int main(int argc, char * argv[]) break; } } - ctx=connect_redis(redis_ip,redis_port, redis_db); + ctx=connect_redis(redis_ip,redis_port, redis_db, NULL); if(ctx==NULL) { return -1;