From fd88b6a266ec38d8d5663eeb3429a36970cea099 Mon Sep 17 00:00:00 2001 From: zhengchao Date: Tue, 26 Apr 2022 16:34:54 +0800 Subject: [PATCH] =?UTF-8?q?EX=5Fdata=5Frt=5Frow2EX=5Fdata=E8=BF=94?= =?UTF-8?q?=E5=9B=9E-1=E6=97=B6=EF=BC=8Cip=5Frule=E5=92=8Cfqdn=5Frule?= =?UTF-8?q?=E5=86=85=E5=AD=98=E6=9C=AA=E9=87=8A=E6=94=BE=EF=BC=8C=E5=AF=BC?= =?UTF-8?q?=E8=87=B4=E5=86=85=E5=AD=98=E6=B3=84=E6=BC=8F=20TSG-10475?= =?UTF-8?q?=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/entry/Maat_command.cpp | 4959 +++++++++++++++--------------- src/entry/Maat_rule.cpp | 2 +- src/entry/Maat_table_runtime.cpp | 24 +- 3 files changed, 2501 insertions(+), 2484 deletions(-) diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index 16c777d..4ca6baa 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -1,2479 +1,2480 @@ -#include "Maat_command.h" -#include "Maat_rule.h" -#include "Maat_rule_internal.h" -#include "Maat_utils.h" -#include "config_monitor.h" -#include "map_str2int.h" -#include "hiredis.h" -#include -#include -#include -#include -#include - -#define maat_redis_monitor (module_name_str("MAAT_REDIS_MONITOR")) -#define maat_command (module_name_str("MAAT_COMMAND")) -const time_t MAAT_REDIS_RECONNECT_INTERVAL=5; -const char* mr_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"}; -const char* mr_status_sset="MAAT_UPDATE_STATUS"; -const char* mr_expire_sset="MAAT_EXPIRE_TIMER"; -const char* mr_label_sset="MAAT_LABEL_INDEX"; -const char* mr_version_sset="MAAT_VERSION_TIMER"; -const char* mr_expire_lock="EXPIRE_OP_LOCK"; -const long mr_expire_lock_timeout=300*1000; -const static int MAAT_REDIS_SYNC_TIME=30*60; -const char* mr_op_str[]={"DEL","ADD","RENEW_TIMEOUT"}; -const char* foreign_source_prefix="redis://"; -const char* foreign_key_prefix="__FILE_"; - - - -int _wrap_redisGetReply(redisContext *c, redisReply **reply) -{ - return redisGetReply(c, (void **)reply); -} -redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...) -{ - va_list ap; - void *reply = NULL; - int ret=REDIS_ERR, retry=0; - while(reply==NULL&&retry<2&&ret!=REDIS_OK) - { - va_start(ap,format); - reply = redisvCommand(c,format,ap); - va_end(ap); - if(reply==NULL) - { - ret=redisReconnect(c); - retry++; - } - } - return (redisReply *)reply; -} -redisContext * connect_redis(const char*redis_ip, int redis_port, int redis_db, void* logger) -{ - struct timeval connect_timeout; - connect_timeout.tv_sec=0; - connect_timeout.tv_usec=100*1000; // 100 ms - redisReply* reply=NULL; - - redisContext * ctx; - ctx=redisConnectWithTimeout(redis_ip, redis_port,connect_timeout); - if(ctx==NULL||ctx->err) - { - if(logger==NULL) - { - printf("Unable to connect redis server %s:%d db%d, error: %s\n", - redis_ip, redis_port, redis_db, ctx==NULL ? "Unknown" : ctx->errstr); - - } - else - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "Unable to connect redis server %s:%d db%d, error: %s", - redis_ip, redis_port, redis_db, ctx==NULL ? "Unknown" : ctx->errstr); - } - if(ctx!=NULL) redisFree(ctx); - return NULL; - } - redisEnableKeepAlive(ctx); - reply=_wrap_redisCommand(ctx, "select %d",redis_db); - freeReplyObject(reply); - reply=NULL; - - return ctx; - -} - -int connect_redis_for_write(struct source_redis_ctx* mr_ctx, void* logger) -{ - assert(mr_ctx->write_ctx==NULL); - mr_ctx->write_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, logger); - if(mr_ctx->write_ctx==NULL) - { - return -1; - } - else - { - return 0; - } -} -redisContext* get_redis_ctx_for_write(struct _Maat_feather_t * feather) -{ - int ret=0; - if(feather->mr_ctx.write_ctx==NULL) - { - ret=connect_redis_for_write(&(feather->mr_ctx), feather->logger); - if(ret!=0) - { - return NULL; - } - } - return feather->mr_ctx.write_ctx; -} -long long read_redis_integer(const redisReply* reply) -{ - switch(reply->type) - { - case REDIS_REPLY_INTEGER: - return reply->integer; - break; - case REDIS_REPLY_ARRAY: - assert(reply->element[0]->type==REDIS_REPLY_INTEGER); - return reply->element[0]->integer; - break; - case REDIS_REPLY_STRING: - return atoll(reply->str); - break; - default: - return -1; - break; - } - return 0; -} -long long redis_server_time(redisContext* ctx) -{ - long long server_time=0; - redisReply* data_reply=NULL; - data_reply=_wrap_redisCommand(ctx,"TIME"); - if(data_reply->type==REDIS_REPLY_ARRAY) - { - server_time=atoll(data_reply->element[0]->str); - freeReplyObject(data_reply); - data_reply=NULL; - } - return server_time; -} -enum MAAT_TABLE_TYPE type_region2table(const struct Maat_region_t* p) -{ - enum MAAT_TABLE_TYPE ret=TABLE_TYPE_IP; - switch(p->region_type) - { - case REGION_IP: - ret=TABLE_TYPE_IP; - break; - case REGION_EXPR: - if(p->expr_rule.district==NULL) - { - ret=TABLE_TYPE_EXPR; - } - else - { - ret=TABLE_TYPE_EXPR_PLUS; - } - break; - case REGION_INTERVAL: - if(p->interval_rule.district==NULL) - { - ret=TABLE_TYPE_INTERVAL; - } - else - { - ret=TABLE_TYPE_INTERVAL_PLUS; - } - break; - case REGION_DIGEST: - ret=TABLE_TYPE_DIGEST; - break; - case REGION_SIMILARITY: - ret=TABLE_TYPE_SIMILARITY; - break; - default: - assert(0); - } - return ret; -} -int get_valid_flag_offset(const char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq) -{ - size_t offset=0, len=0; - unsigned int column_seq=0, ret=0; - switch(type) - { - case TABLE_TYPE_EXPR: - column_seq=7; - break; - case TABLE_TYPE_IP: - column_seq=14; - break; - case TABLE_TYPE_IP_PLUS: - column_seq=18; - break; - case TABLE_TYPE_COMPILE: - column_seq=8; - break; - case TABLE_TYPE_PLUGIN: - case TABLE_TYPE_IP_PLUGIN: - case TABLE_TYPE_FQDN_PLUGIN: - if(valid_column_seq<0) - { - return -1; - } - column_seq=(unsigned int)valid_column_seq; - break; - case TABLE_TYPE_INTERVAL: - column_seq=5; - break; - case TABLE_TYPE_INTERVAL_PLUS: - column_seq=6; - break; - case TABLE_TYPE_DIGEST: - column_seq=6; - break; - case TABLE_TYPE_SIMILARITY: - column_seq=5; - break; - case TABLE_TYPE_EXPR_PLUS: - column_seq=8; - break; - case TABLE_TYPE_GROUP2COMPILE: - case TABLE_TYPE_GROUP2GROUP: - column_seq=3; - break; - default: - assert(0); - } - - ret=get_column_pos(line, column_seq, &offset, &len); - if(ret<0||offset>=strlen(line)||(line[offset]!='1'&&line[offset]!='0'))// 0 is also a valid value for some non-MAAT producer. - { - return -1; - } - return offset; -} -int invalidate_line(char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq) -{ - int i=0; - i=get_valid_flag_offset(line, type,valid_column_seq); - if(i<0) - { - return -1; - } - line[i]='0'; - return 0; -} - -void serialize_group2group(enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g, char* buff, size_t sz) -{ - snprintf(buff, sz, "%d\t%d\t%d", g2g->group_id, - g2g->superior_group_id, - op); - return; -} -void serialize_group2compile(enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c, char* buff, size_t sz) -{ - snprintf(buff, sz, "%d\t%d\t%d\t%d\t%s\t%d", g2c->group_id, - g2c->compile_id, - op, - g2c->not_flag, - g2c->virtual_table_name?g2c->virtual_table_name:"null", - g2c->clause_index); - return; -} -void serialize_compile(const struct Maat_rule_t* p_m_rule, const char* huge_service_defined, int clause_num, enum MAAT_OPERATION op, char* buff, size_t sz) -{ - if(op==MAAT_OP_RENEW_TIMEOUT) op=MAAT_OP_ADD; - const char* service_define=huge_service_defined?huge_service_defined:(strlen(p_m_rule->service_defined)?p_m_rule->service_defined:"null"); - - snprintf(buff, sz, "%d\t%d\t%hhu\t%hhu\t%hhu\t0\t%s\t%d\t%d", - p_m_rule->config_id, - p_m_rule->service_id, - p_m_rule->action, - p_m_rule->do_blacklist, - p_m_rule->do_log, - service_define, - op, - clause_num); - return; -} -void serialize_region(const struct Maat_cmd_region* p, int group_id, char* buff, size_t sz) -{ - UNUSED size_t ret=0; - switch(p->region_type) - { - case REGION_IP: - ret=snprintf(buff, sz, "%d\t%d\t%d\t%s\t%s\t%hu\t%hu\t%s\t%s\t%hu\t%hu\t%d\t%d\t1", - p->region_id, - group_id, - p->ip_rule.addr_type, - p->ip_rule.src_ip, - p->ip_rule.mask_src_ip, - p->ip_rule.src_port, - p->ip_rule.mask_src_port, - p->ip_rule.dst_ip, - p->ip_rule.mask_dst_ip, - p->ip_rule.dst_port, - p->ip_rule.mask_dst_port, - p->ip_rule.protocol, - p->ip_rule.direction); - break; - case REGION_IP_PLUS: - ret=snprintf(buff, sz, "%d\t%d\t%d\t%s\t%s\t%s\t%s\t%hu\t%hu\t%s\t%s\t%s\t%s\t%hu\t%hu\t%d\t%d\t1", - p->region_id, - group_id, - p->ip_plus_rule.addr_type, - p->ip_plus_rule.saddr_format, - p->ip_plus_rule.src_ip1, - p->ip_plus_rule.src_ip2, - p->ip_plus_rule.sport_format, - p->ip_plus_rule.src_port1, - p->ip_plus_rule.src_port2, - p->ip_plus_rule.daddr_format, - p->ip_plus_rule.dst_ip1, - p->ip_plus_rule.dst_ip2, - p->ip_plus_rule.dport_format, - p->ip_plus_rule.dst_port1, - p->ip_plus_rule.dst_port2, - p->ip_plus_rule.protocol, - p->ip_plus_rule.direction); - break; - case REGION_EXPR: - if(p->expr_rule.district==NULL) - { - ret=snprintf(buff,sz,"%d\t%d\t%s\t%d\t%d\t%d\t1", - p->region_id, - group_id, - p->expr_rule.keywords, - p->expr_rule.expr_type, - p->expr_rule.match_method, - p->expr_rule.hex_bin); - } - else //expr_plus - { - ret=snprintf(buff,sz,"%d\t%d\t%s\t%s\t%d\t%d\t%d\t1", - p->region_id, - group_id, - p->expr_rule.district, - p->expr_rule.keywords, - p->expr_rule.expr_type, - p->expr_rule.match_method, - p->expr_rule.hex_bin); - } - break; - case REGION_INTERVAL: - ret=snprintf(buff,sz,"%d\t%d\t%u\t%u\t1", - p->region_id, - group_id, - p->interval_rule.low_boundary, - p->interval_rule.up_boundary); - break; - case REGION_DIGEST: - ret=snprintf(buff,sz,"%d\t%d\t%llu\t%s\t%hd\t1", - p->region_id, - group_id, - p->digest_rule.orgin_len, - p->digest_rule.digest_string, - p->digest_rule.confidence_degree); - break; - case REGION_SIMILARITY: - ret=snprintf(buff,sz,"%d\t%d\t%s\t%hd\t1", - p->region_id, - group_id, - p->similarity_rule.target, - p->similarity_rule.threshold); - break; - default: - assert(0); - } - assert(rettable_line!=NULL) - { - free(rule->table_line); - } - if(rule->n_foreign>0) - { - for(int i=0; in_foreign; i++) - { - free(rule->f_keys[i].filename); - free(rule->f_keys[i].key); - } - free(rule->f_keys); - } - memset(rule,0,sizeof(struct serial_rule_t)); - return; -} -void set_serial_rule(struct serial_rule_t* rule, enum MAAT_OPERATION op, unsigned long rule_id,int label_id,const char* table_name,const char* line, long long timeout) -{ - memset(rule, 0, sizeof(struct serial_rule_t)); - rule->op=op; - rule->rule_id=rule_id; - rule->label_id=label_id; - rule->timeout=timeout; - assert(strlen(table_name)table_name)); - strncpy(rule->table_name, table_name, sizeof(rule->table_name)); - if(line!=NULL) - { - rule->table_line=_maat_strdup(line); - } - return; -} -int get_inc_key_list(long long instance_version, long long target_version, redisContext *c, struct serial_rule_t** list,void* logger) -{ - redisReply* reply=NULL,*tmp_reply=NULL; - char err_buff[256], op_str[4]; - int rule_num=0; - UNUSED int ret=0; - unsigned int i=0, j=0; - long long nearest_rule_version; - struct serial_rule_t *s_rule=NULL; - - //Returns all the elements in the sorted set at key with a score that instance_version < score <= redis_version. - //The elements are considered to be ordered from low to high scores(instance_version). - reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",mr_status_sset,instance_version,target_version); - - if(reply==NULL) - { - __redis_strerror_r(errno,err_buff,sizeof(err_buff)-1); - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "GET %s failed %s.",mr_status_sset,err_buff); - return -1; - } - assert(reply->type==REDIS_REPLY_ARRAY); - rule_num=reply->elements; - if(reply->elements==0) - { - freeReplyObject(reply); - reply=NULL; - return 0; - } - - tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",mr_status_sset,reply->element[0]->str); - if(tmp_reply->type!=REDIS_REPLY_STRING) - { - MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "ZSCORE %s %s failed Version: %lld->%lld",mr_status_sset,reply->element[0]->str,instance_version, target_version); - freeReplyObject(tmp_reply); - tmp_reply=NULL; - freeReplyObject(reply); - reply=NULL; - return -1; - } - nearest_rule_version=read_redis_integer(tmp_reply); - freeReplyObject(tmp_reply); - tmp_reply=NULL; - if(nearest_rule_version<0) - { - return -1; - } - if(nearest_rule_version!=instance_version+1) - { - MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "Noncontinuous VERSION Redis: %lld MAAT: %lld.",nearest_rule_version,instance_version); - } - s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t)); - for(i=0, j=0;ielements;i++) - { - assert(reply->element[i]->type==REDIS_REPLY_STRING); - ret=sscanf(reply->element[i]->str,"%[^,],%[^,],%lu",op_str,s_rule[j].table_name,&(s_rule[j].rule_id)); - if(ret!=3||s_rule[i].rule_id<0) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "Invalid Redis Key: %s",reply->element[i]->str); - continue; - } - if(strncmp(op_str,"ADD",strlen("ADD"))==0) - { - s_rule[j].op=MAAT_OP_ADD; - } - else if(strncmp(op_str,"DEL",strlen("DEL"))==0) - { - s_rule[j].op=MAAT_OP_DEL; - } - else - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "Invalid Redis Key: %s",reply->element[i]->str); - continue; - } - j++; - } - rule_num=j; - *list=s_rule; - freeReplyObject(reply); - reply=NULL; - - return rule_num; -} -struct s_rule_array_t -{ - int cnt; - int size; - struct serial_rule_t* array; -}; -void save_serial_rule_cb(const uchar * key, uint size, void * data, void * user) -{ - struct s_rule_array_t* array=(struct s_rule_array_t*)user; - int i=array->cnt; - memcpy(&(array->array[i]),data,sizeof(struct serial_rule_t)); - array->array[i].op=MAAT_OP_ADD; - return; -} -int recovery_history_version(const struct serial_rule_t* current, int current_num, const struct serial_rule_t* changed, int changed_num, struct serial_rule_t** history_result) -{ - int i=0,ret=0; - unsigned int history_num=0; - int hash_slot_size=1; - MESA_htable_handle htable=NULL; - MESA_htable_create_args_t hargs; - struct s_rule_array_t tmp_array; - char hkey[256+20]; - int tmp=current_num+changed_num; - for(;tmp>0;tmp=tmp/2) - { - hash_slot_size*=2; - } - - memset(&hargs,0,sizeof(hargs)); - hargs.thread_safe=0; - hargs.hash_slot_size = hash_slot_size; - hargs.max_elem_num = 0; - hargs.eliminate_type = HASH_ELIMINATE_ALGO_FIFO; - hargs.expire_time = 0; - hargs.key_comp = NULL; - hargs.key2index = NULL; - hargs.recursive = 1; - hargs.data_free = NULL;//data is an reference, no need to free. - hargs.data_expire_with_condition = NULL; - htable=MESA_htable_create(&hargs, sizeof(hargs)); - MESA_htable_print_crtl(htable, 0); - - for(i=0;i0); - } - - for(i=changed_num-1;i>=0;i--) - { - snprintf(hkey,sizeof(hkey),"%ld,%s",changed[i].rule_id,changed[i].table_name); - if(changed[i].op==MAAT_OP_ADD)//newly added rule is need to delete from current, so that history version can be recovered. - { - ret=MESA_htable_del(htable, (uchar*)hkey, strlen(hkey), NULL); - } - else - { - ret=MESA_htable_add(htable, (uchar*)hkey, strlen(hkey),changed+i); - } - if(ret<0)//failed - { - goto error_out; - } - } - history_num=MESA_htable_get_elem_num(htable); - tmp_array.cnt=0; - tmp_array.size=history_num; - tmp_array.array=(struct serial_rule_t*)calloc(history_num,sizeof(struct serial_rule_t)); - MESA_htable_iterate(htable, save_serial_rule_cb, &tmp_array); - *history_result=tmp_array.array; - ret=history_num; -error_out: - MESA_htable_destroy(htable, NULL); - return ret; -} - -int get_rm_key_list(redisContext *c, long long instance_version, long long desired_version, long long* new_version, struct Maat_table_manager* table_mgr, struct serial_rule_t** list,int *update_type, void* logger, int cumulative_off) -{ - redisReply* reply=NULL,*sub_reply=NULL; - char err_buff[256]; - long long redis_version=0,target_version=0; - int rule_num=0, changed_rule_num=0, table_id=0; - int ret=0; - unsigned int i=0,full_idx =0,append_cmd_cnt=0; - struct serial_rule_t *s_rule_array=NULL, *changed_rule_array=NULL, *history_rule_array=NULL; - - reply=(redisReply*)redisCommand(c, "GET MAAT_VERSION"); - if(reply!=NULL) - { - - if(reply->type==REDIS_REPLY_NIL||reply->type==REDIS_REPLY_ERROR) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"GET MAAT_VERSION failed, maybe Redis is busy."); - freeReplyObject(reply); - reply=NULL; - return -1; - } - } - else - { - memset(err_buff, 0, sizeof(err_buff)); - __redis_strerror_r(errno, err_buff, sizeof(err_buff)-1); - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "GET MAAT_VERSION failed %s.",err_buff); - return -1; - } - redis_version=read_redis_integer(reply); - if(redis_version<0) - { - if(reply->type==REDIS_REPLY_ERROR) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "Redis Communication error: %s.",reply->str); - } - return -1; - } - freeReplyObject(reply); - reply=NULL; - if(redis_version==instance_version) - { - return 0; - } - - if(instance_version==0||desired_version!=0) - { - goto FULL_UPDATE; - } - if(redis_version Redis: %lld.",instance_version,redis_version); - goto FULL_UPDATE; - } - if(redis_version>instance_version&&cumulative_off==1) - { - target_version=instance_version; - } - else - { - target_version=redis_version-1; - } - do{ - target_version++; - rule_num=get_inc_key_list(instance_version, target_version, c, &s_rule_array,logger); - if(rule_num>0) - { - break; - } - else if(rule_num<0) - { - goto FULL_UPDATE; - } - else - { - //ret=0, nothing to do. - } - - }while(rule_num==0&&target_version<=redis_version&&cumulative_off==1); - if(rule_num==0) - { - MESA_handle_runtime_log(logger, RLOG_LV_DEBUG, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s" - ,mr_status_sset,instance_version,target_version-1,cumulative_off==1?"OFF":"ON"); - return 0; - } - MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "Inc Update from instance_version %lld to %lld (%d entries).",instance_version,target_version,rule_num); - *list=s_rule_array; - *update_type=CM_UPDATE_TYPE_INC; - *new_version=target_version; - return rule_num; - -FULL_UPDATE: - MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "Initiate full udpate from instance_version %d to %lld.",instance_version,desired_version==0?redis_version:desired_version); - append_cmd_cnt=0; - ret=redisAppendCommand(c, "MULTI"); - append_cmd_cnt++; - ret=redisAppendCommand(c, "GET MAAT_VERSION"); - append_cmd_cnt++; - ret=redisAppendCommand(c, "KEYS EFFECTIVE_RULE:*"); - append_cmd_cnt++; - //consume reply "OK" and "QUEUED". - for(i=0;ierrstr); - return -1; - } - if(reply->type!=REDIS_REPLY_ARRAY) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "Invalid Redis Key List type %d", reply->type); - freeReplyObject(reply); - reply=NULL; - return -1; - } - *new_version=read_redis_integer(reply->element[0]); - sub_reply=reply->element[1]; - if(sub_reply->type!=REDIS_REPLY_ARRAY) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "Invalid Redis Key List type %d", sub_reply->type); - freeReplyObject(reply); - reply=NULL; - return -1; - } - - s_rule_array=(struct serial_rule_t*)calloc(sub_reply->elements,sizeof(struct serial_rule_t)); - for(i=0, full_idx=0; ielements; i++) - { - if(sub_reply->element[i]->type!=REDIS_REPLY_STRING) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "Invalid Redis Key Type: %d", sub_reply->element[i]->type); - continue; - } - ret=sscanf(sub_reply->element[i]->str,"%*[^:]:%[^,],%ld",s_rule_array[full_idx].table_name,&(s_rule_array[full_idx].rule_id)); - s_rule_array[full_idx].op=MAAT_OP_ADD; - if(ret!=2||s_rule_array[full_idx].rule_id<0||strlen(s_rule_array[full_idx].table_name)==0) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "Invalid Redis Key Format: %s", sub_reply->element[i]->str); - continue; - } - if(table_mgr) - { - table_id=Maat_table_get_id_by_name(table_mgr, s_rule_array[full_idx].table_name); - if(table_id<0)//Unrecognized table. - { - continue; - } - } - full_idx++; - } - rule_num=full_idx; - freeReplyObject(reply); - reply=NULL; - if(desired_version!=0) - { - changed_rule_num=get_inc_key_list(desired_version, redis_version, c, &changed_rule_array, logger); - if(changed_rule_num<0) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "Recover history version %lld faild where as redis version is %lld.", desired_version, redis_version); - } - else if(changed_rule_num==0) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "Nothing to recover from history version %lld to redis version is %lld.", desired_version, redis_version); - } - else - { - ret=recovery_history_version(s_rule_array, full_idx, changed_rule_array, changed_rule_num, &history_rule_array); - if(ret>0) - { - free(s_rule_array); - s_rule_array=history_rule_array; - rule_num=ret; - *new_version=desired_version; - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "Successfully recovered from history version %lld to redis version is %lld.", desired_version, redis_version); - } - } - free(changed_rule_array); - } - *list=s_rule_array; - *update_type=CM_UPDATE_TYPE_FULL; - MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "Full update %d keys of version %lld.", rule_num, *new_version); - - return rule_num ; -} - -int _get_maat_redis_value(redisContext *c, struct serial_rule_t* rule_list, int rule_num, void* logger) -{ - int i=0,failed_cnt=0,idx=0; - UNUSED int ret=0; - int error_happened=0; - int *retry_ids=(int*)malloc(sizeof(int)*rule_num); - char redis_cmd[256]; - redisReply* reply=NULL; - for(i=0;itype==REDIS_REPLY_STRING) - { - rule_list[i].table_line=_maat_strdup(reply->str); - } - else - { - if(reply->type==REDIS_REPLY_NIL) - { - retry_ids[failed_cnt]=i; - failed_cnt++; - } - else - { - MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor - ,"Redis GET %s:%s,%d failed",mr_key_prefix[rule_list[i].op] - ,rule_list[i].table_name - ,rule_list[i].rule_id); - error_happened=1; - } - } - freeReplyObject(reply); - reply=NULL; - } - if(error_happened==1) - { - free(retry_ids); - return -1; - } - - for(i=0;itype==REDIS_REPLY_STRING) - { - rule_list[idx].table_line=_maat_strdup(reply->str); - } - else if(reply->type==REDIS_REPLY_ERROR)//Deal with Redis response: "Loading Redis is loading the database in memory" - { - MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor, - "redis command %s error, reply type=%d, error str=%s", redis_cmd, reply->type, reply->str); - } - else //Handle type "nil" - { - MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor, - "redis command %s failed, reply type=%d", redis_cmd, reply->type); - } - - freeReplyObject(reply); - reply=NULL; - - } - free(retry_ids); - return 0; -} -int get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger,int print_process) -{ - int max_redis_batch=4*1024,batch_cnt=0; - int success_cnt=0,ret=0; - int next_print=10; - while(success_cntnext_print) - { - printf(" >%d%%",next_print); - next_print+=10; - } - } - } - if(print_process==1) - { - printf(" >100%%\n"); - } - return 0; -} - -int mr_transaction_success(redisReply* data_reply) -{ - if(data_reply->type==REDIS_REPLY_NIL) - { - return 0; - } - else - { - return 1; - } -} - -int redlock_try_lock(redisContext *ctx, const char* lock_name, long long expire) -{ - redisReply* reply=NULL; - int ret=0; - reply=_wrap_redisCommand(ctx,"SET %s locked NX PX %lld", lock_name, expire); - if(reply->type==REDIS_REPLY_NIL) - { - ret=0; - } - else - { - ret=1; - } - freeReplyObject(reply); - reply=NULL; - - return ret; -} -void redlock_unlock(redisContext * ctx, const char * lock_name) -{ - redisReply* reply=NULL; - reply=_wrap_redisCommand(ctx,"DEL %s", lock_name); - freeReplyObject(reply); - reply=NULL; - -} -#define POSSIBLE_REDIS_REPLY_SIZE 2 -struct expected_reply -{ - int srule_seq; - int possible_reply_num; - redisReply possible_replies[POSSIBLE_REDIS_REPLY_SIZE]; -}; -void expected_reply_add(struct expected_reply* expected, int srule_seq, int type, long long integer) -{ - int i=expected->possible_reply_num; - assert(isrule_seq=srule_seq; - expected->possible_replies[i].type=type; - expected->possible_replies[i].integer=integer; - expected->possible_reply_num++; -} -int mr_operation_success(redisReply* actual_reply, struct expected_reply* expected) -{ - int i=0; - if(expected->possible_replies[0].type!=actual_reply->type) - { - return 0; - } - for(i=0; i< expected->possible_reply_num; i++) - { - if(expected->possible_replies[i].type==REDIS_REPLY_INTEGER && - expected->possible_replies[i].type==actual_reply->type && - expected->possible_replies[i].integer==actual_reply->integer) - { - return 1; - } - if(expected->possible_replies[i].type==REDIS_REPLY_STATUS && - expected->possible_replies[i].type==actual_reply->type && - 0==strcasecmp(actual_reply->str, "OK")) - { - return 1; - } - } - return 0; - -} - -long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule_num,int *renew_allowed, long long *transaction_version) -{ - int ret=-1; - redisReply* data_reply=NULL; - if(renew_rule_num>0) - { - while(0==redlock_try_lock(ctx, mr_expire_lock, mr_expire_lock_timeout)) - { - usleep(1000); - } - *renew_allowed=1; - } - if(rule_num>renew_rule_num) - { - data_reply=_wrap_redisCommand(ctx, "INCRBY MAAT_PRE_VER 1"); - *transaction_version=read_redis_integer(data_reply); - freeReplyObject(data_reply); - data_reply=NULL; - if(*transaction_version<0) - { - return -1; - } - } - if(*renew_allowed==1||rule_num>renew_rule_num) - { - data_reply=_wrap_redisCommand(ctx,"MULTI"); - freeReplyObject(data_reply); - data_reply=NULL; - ret=0; - } - return ret; -} -//parameters: 4 keys: MAAT_VERSION MAAT_UPDATE_STATUS MAAT_VERSION_TIMER MAAT_TRANSACTION_xx, 1 args: SERVER_TIME -const char* lua_exec_done= -"local maat_version=redis.call(\'incrby\', KEYS[1], 1);" -"local transaction=redis.call(\'lrange\', KEYS[4], 0, -1);" -"for k,v in pairs(transaction) do" -" redis.call(\'zadd\', KEYS[2], maat_version, v);" -"end;" -"redis.call(\'del\', KEYS[4]);" -"redis.call(\'zadd\', KEYS[3], ARGV[1], maat_version);" -"return maat_version;"; -redisReply* _exec_serial_rule_end(redisContext* ctx, const char* transaction_list, long long server_time, int renew_allowed, struct expected_reply* expect_reply, unsigned int *cnt) -{ - redisReply* data_reply=NULL; - if(renew_allowed==1) - { - redlock_unlock(ctx, mr_expire_lock); - expect_reply[*cnt].srule_seq=-1; - (*cnt)++; - } - if(strlen(transaction_list)>0) - { - data_reply=_wrap_redisCommand(ctx, "eval %s 4 MAAT_VERSION %s %s %s %lld", - lua_exec_done, - mr_status_sset, - mr_version_sset, - transaction_list, - server_time); - freeReplyObject(data_reply); - data_reply=NULL; - expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); - (*cnt)++; - } - data_reply=_wrap_redisCommand(ctx,"EXEC"); - return data_reply; -} - -void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct serial_rule_t* s_rule, unsigned int rule_num, struct expected_reply* expect_reply, unsigned int *cnt, int offset,int renew_allowed) -{ - redisReply* data_reply=NULL; - unsigned int append_cmd_cnt=0, i=0; - for(i=0;i0) - { - redisAppendCommand(ctx,"ZADD %s %lld %s,%lu", - mr_expire_sset, - s_rule[i].timeout, - s_rule[i].table_name, - s_rule[i].rule_id); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0); - (*cnt)++; - append_cmd_cnt++; - } - if(s_rule[i].label_id>0) - { - redisAppendCommand(ctx,"ZADD %s %d %s,%lu", - mr_label_sset, - s_rule[i].label_id, - s_rule[i].table_name, - s_rule[i].rule_id); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0); - - (*cnt)++; - - append_cmd_cnt++; - } - break; - case MAAT_OP_DEL: - redisAppendCommand(ctx,"RENAME %s:%s,%lu %s:%s,%lu", - mr_key_prefix[MAAT_OP_ADD], - s_rule[i].table_name, - s_rule[i].rule_id, - mr_key_prefix[MAAT_OP_DEL], - s_rule[i].table_name, - s_rule[i].rule_id - ); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0); - (*cnt)++; - append_cmd_cnt++; - - redisAppendCommand(ctx,"EXPIRE %s:%s,%lu %d", - mr_key_prefix[MAAT_OP_DEL], - s_rule[i].table_name, - s_rule[i].rule_id, - MAAT_REDIS_SYNC_TIME); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); - (*cnt)++; - append_cmd_cnt++; - - //NX: Don't update already exisiting elements. Always add new elements. - redisAppendCommand(ctx,"RPUSH %s DEL,%s,%lu", - transaction_list, - s_rule[i].table_name, - s_rule[i].rule_id); - expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); - (*cnt)++; - append_cmd_cnt++; - - // Try to remove from expiration sorted set, no matter wheather it exists or not. - redisAppendCommand(ctx,"ZREM %s %s,%lu", - mr_expire_sset, - s_rule[i].table_name, - s_rule[i].rule_id); - expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); - (*cnt)++; - append_cmd_cnt++; - - // Try to remove from label sorted set, no matter wheather it exists or not. - redisAppendCommand(ctx,"ZREM %s %s,%lu", - mr_label_sset, - s_rule[i].table_name, - s_rule[i].rule_id); - expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); - (*cnt)++; - append_cmd_cnt++; - break; - case MAAT_OP_RENEW_TIMEOUT: - if(renew_allowed!=1) - { - continue; - } - //s_rule[i].timeout>0 was checked by caller. - redisAppendCommand(ctx,"ZADD %s %lld %s,%lu", - mr_expire_sset, - s_rule[i].timeout, - s_rule[i].table_name, - s_rule[i].rule_id); - expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); - (*cnt)++; - append_cmd_cnt++; - - break; - default: - assert(0); - break; - } - } - for(i=0;i0) - { - snprintf(transaction_list, sizeof(transaction_list), "MAAT_TRANSACTION_%lld", transaction_version); - } - while(success_cntelements==multi_cmd_cnt); - for(i=0;ielement[i]; - //failed is acceptable - //or transaciton is success - //or continuation of last failed - if(expected_reply[i].srule_seq==-1||1==mr_operation_success(p, expected_reply+i)||last_failed==expected_reply[i].srule_seq) - { - continue; - } - rule_seq=expected_reply[i].srule_seq; - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_command, - "%s %s %d failed, rule id maybe conflict or not exist.", - mr_op_str[s_rule[rule_seq].op], - s_rule[rule_seq].table_name,s_rule[rule_seq].rule_id); - success_cnt--; - last_failed=rule_seq; - } - } - else - { - success_cnt=-1; - } - if(transaction_version>0) - { - transaction_finished_version=read_redis_integer(transaction_reply->element[multi_cmd_cnt-1]); - MESA_handle_runtime_log(logger, RLOG_LV_DEBUG, maat_command, - "Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld ", - transaction_version, - transaction_finished_version); - } - - freeReplyObject(transaction_reply); - transaction_reply=NULL; - -error_out: - if(renew_num>0&&renew_allowed!=1) - { - for(i=0;i<(unsigned int)serial_rule_num;i++) - { - if(s_rule[i].op==MAAT_OP_RENEW_TIMEOUT) - { - MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command - ,"%s %s %d is not allowed due to lock contention.",mr_op_str[MAAT_OP_RENEW_TIMEOUT] - , s_rule[i].table_name,s_rule[i].rule_id); - } - } - if(success_cnt>0) - { - success_cnt-=renew_num; - } - } - free(expected_reply); - return success_cnt; -} - - -void check_maat_expiration(redisContext *ctx, void *logger) -{ - unsigned int i=0,s_rule_num=0; - UNUSED int ret=0; - int success_cnt=0; - redisReply* data_reply=NULL; - struct serial_rule_t* s_rule=NULL; - long long server_time=0; - - server_time=redis_server_time(ctx); - if(!server_time) - { - return; - } - data_reply=_wrap_redisCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",mr_expire_sset,server_time); - if(data_reply->type!=REDIS_REPLY_ARRAY||data_reply->elements==0) - { - freeReplyObject(data_reply); - data_reply=NULL; - return; - } - s_rule_num=data_reply->elements; - s_rule=(struct serial_rule_t*)calloc(sizeof(struct serial_rule_t),s_rule_num); - for(i=0;ielement[i]->str,"%[^,],%ld",s_rule[i].table_name,&(s_rule[i].rule_id)); - assert(ret==2); - } - freeReplyObject(data_reply); - data_reply=NULL; - success_cnt=exec_serial_rule(ctx,s_rule, s_rule_num,server_time, logger); - - if(success_cnt==(int)s_rule_num) - { - MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor - ,"Succesfully expired %d rules in Redis.", s_rule_num); - } - else - { - MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor - ,"Failed to expired %d of %d rules in Redis, try later.", s_rule_num-success_cnt,s_rule_num); - } - - free(s_rule); - return; -} -void cleanup_update_status(redisContext *ctx, void *logger) -{ - redisReply* reply=NULL,*sub_reply=NULL; - int append_cmd_cnt=0,i=0; - long long server_time=0, version_upper_bound=0,version_lower_bound=0,version_num=0,entry_num=0; - - server_time=redis_server_time(ctx); - if(!server_time) - { - return; - } - reply=_wrap_redisCommand(ctx,"MULTI"); - freeReplyObject(reply); - reply=NULL; - redisAppendCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",mr_version_sset,server_time-MAAT_REDIS_SYNC_TIME); - append_cmd_cnt++; - redisAppendCommand(ctx, "ZREMRANGEBYSCORE %s -inf %lld",mr_version_sset,server_time-MAAT_REDIS_SYNC_TIME); - append_cmd_cnt++; - //consume reply "OK" and "QUEUED". - for(i=0;itype!=REDIS_REPLY_ARRAY) - { - goto error_out; - } - sub_reply=reply->element[0]; - if(sub_reply->type!=REDIS_REPLY_ARRAY) - { - goto error_out; - } - version_num=sub_reply->elements; - if(version_num==0) - { - goto error_out; - } - version_lower_bound=read_redis_integer(sub_reply->element[0]); - version_upper_bound=read_redis_integer(sub_reply->element[sub_reply->elements-1]); - freeReplyObject(reply); - reply=NULL; - - //To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally. - reply=_wrap_redisCommand(ctx,"ZREMRANGEBYSCORE %s %lld %lld",mr_status_sset,version_lower_bound,version_upper_bound); - entry_num=read_redis_integer(reply); - freeReplyObject(reply); - reply=NULL; - - MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor - ,"Clean up update status from version %lld to %lld (%lld versions, %lld entries)." - ,version_lower_bound - ,version_upper_bound - ,version_num - ,entry_num); - return; - -error_out: - freeReplyObject(reply); - reply=NULL; - return; -} -const char* find_Nth_column(const char* line, int Nth, int* column_len) -{ - size_t i=0; - int j=0; - int start=0, end=0; - size_t line_len= strlen(line); - for(i=0;in_foreign; i++) - { - fn_size+=strlen(p->f_keys[i].filename); - } - - rewrite_line=(char*)calloc(sizeof(char), strlen(p->table_line)+fn_size); - pos_origin_line=p->table_line; - pos_rewrite_line=rewrite_line; - - for(i=0; in_foreign; i++) - { - origin_column=find_Nth_column(p->table_line, p->f_keys[i].column, &origin_column_size); - strncat(pos_rewrite_line, pos_origin_line, origin_column-pos_origin_line); - pos_rewrite_line+=origin_column-pos_origin_line; - pos_origin_line=origin_column+origin_column_size; - - strncat(pos_rewrite_line, p->f_keys[i].filename, strlen(p->f_keys[i].filename)); - pos_rewrite_line+=strlen(p->f_keys[i].filename); - } - strncat(pos_rewrite_line, pos_origin_line, strlen(p->table_line)-(pos_origin_line-p->table_line)); - - free(p->table_line); - p->table_line=rewrite_line; - return; -} -void _get_foregin_keys(struct serial_rule_t* p_rule, int* foreign_columns, int n_foreign, const char* dir, void* logger) -{ - int i=0; - const char* p_foreign=NULL; - int foreign_key_size=0; - p_rule->f_keys=ALLOC(struct foreign_key, n_foreign); - for(i=0; itable_line, foreign_columns[i], &foreign_key_size); - if(p_foreign==NULL) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "Get %s,%d foreign keys failed: No %dth column.", - p_rule->table_name, p_rule->rule_id, foreign_columns[i]); - continue; - } - if(0==strncasecmp(p_foreign, "null", strlen("null"))) - {//emtpy file - continue; - } - if(0!=strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix))) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, - "Get %s,%d foreign key failed: Invalid source prefix %s.", - p_rule->table_name, p_rule->rule_id, p_foreign); - continue; - } - p_rule->f_keys[p_rule->n_foreign].column=foreign_columns[i]; - foreign_key_size=foreign_key_size-strlen(foreign_source_prefix); - p_foreign+=strlen(foreign_source_prefix); - if(0!=strncmp(p_foreign, foreign_key_prefix, strlen(foreign_key_prefix))) - { - MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, - "%s, %d foreign key prefix %s is not recommended.", - p_rule->table_name, p_rule->rule_id, p_foreign); - } - p_rule->f_keys[p_rule->n_foreign].key=ALLOC(char, foreign_key_size+1); - memcpy(p_rule->f_keys[p_rule->n_foreign].key, p_foreign, foreign_key_size); - p_rule->f_keys[p_rule->n_foreign].filename=get_foreign_cont_filename(p_rule->table_name, p_rule->rule_id, p_rule->f_keys[p_rule->n_foreign].key, dir); - p_rule->n_foreign++; - } - if(p_rule->n_foreign==0) - { - free(p_rule->f_keys); - p_rule->f_keys=NULL; - } - return; -} -int get_foreign_keys_define(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, _Maat_feather_t* feather, const char* dir,void *logger) -{ - int i=0; - int rule_with_foreign_key=0; - struct Maat_table_schema* p_table=NULL; - struct plugin_table_schema* plugin_desc=NULL; - for(i=0; itable_mgr, rule_list[i].table_name); - if(!p_table||p_table->table_type!=TABLE_TYPE_PLUGIN) - { - continue; - } - plugin_desc= &(p_table->plugin); - if(plugin_desc->n_foreign==0) - { - continue; - } - _get_foregin_keys(rule_list+i, plugin_desc->foreign_columns, plugin_desc->n_foreign, dir, logger); - rule_with_foreign_key++; - } - return rule_with_foreign_key; -} -int get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, const char* dir,void *logger) -{ - int i=0, j=0, foreign_key_size=0; - int rule_with_foreign_key=0; - const char* p_foreign=NULL; - - int n_foreign=0; - int foreign_columns[MAX_FOREIGN_CLMN_NUM]; - for(i=0; i(int)strlen(foreign_source_prefix)&&0==strncmp(p_foreign,foreign_source_prefix, strlen(foreign_source_prefix))) - { - foreign_columns[n_foreign]=j; - n_foreign++; - } - j++; - }while(p_foreign!=NULL&&n_foreign0) - { - _get_foregin_keys(rule_list+i, foreign_columns, n_foreign,dir,logger); - rule_with_foreign_key++; - } - } - return rule_with_foreign_key; -} - -struct foreign_conts_track -{ - int rule_idx; - int foreign_idx; -}; -void _get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, int print_fn, void *logger) -{ - int i=0, j=0; - UNUSED int ret=0; - int key_num=0; - struct foreign_conts_track* track=ALLOC(struct foreign_conts_track, rule_num*MAX_FOREIGN_CLMN_NUM); - char redis_cmd[256]; - redisReply* reply=NULL; - struct serial_rule_t*p=NULL; - FILE* fp=NULL; - struct stat file_info; - - for(i=0;in_foreign==0) - { - continue; - } - if(p->op==MAAT_OP_DEL) - { - for(j=0; jn_foreign; j++) - { - if(rule_list[i].f_keys[j].filename==NULL) - { - continue; - } - ret=stat(p->f_keys[j].filename, &file_info); - if(ret==0) - { - continue; - } - snprintf(redis_cmd,sizeof(redis_cmd),"GET %s", p->f_keys[j].key); - ret=redisAppendCommand(ctx, redis_cmd); - track[key_num].rule_idx=i; - track[key_num].foreign_idx=j; - key_num++; - assert(ret==REDIS_OK); - } - } - } - for(i=0;itype!=REDIS_REPLY_STRING) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor - ,"Get %s,%d foreign key %s content failed." - ,rule_list[track[i].rule_idx].table_name - ,rule_list[track[i].rule_idx].rule_id - ,rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key); - continue; - } - else - { - p=rule_list+track[i].rule_idx; - fp=fopen(p->f_keys[track[i].foreign_idx].filename, "w"); - if(fp==NULL) - { - MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor - , "Write foreign content failed: fopen %s error." - , p->f_keys[track[i].foreign_idx]); - } - else - { - fwrite(reply->str, 1, reply->len, fp); - fclose(fp); - fp=NULL; - if(print_fn==1) - { - printf("Written foreign content %s\n",p->f_keys[track[i].foreign_idx].filename); - } - } - } - freeReplyObject(reply); - reply=NULL; - } - - free(track); - return; -} -void get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, int print_fn, void *logger) -{ - int max_redis_batch=4*1024,batch_cnt=0; - int success_cnt=0; - while(success_cntlogger; - - if(mr_ctx->write_ctx!=NULL&&mr_ctx->write_ctx->err==0)//authorized to write - { - //For thread safe, deliberately use redis_read_ctx but not redis_write_ctx. - if(1==redlock_try_lock(mr_ctx->read_ctx, mr_expire_lock, mr_expire_lock_timeout)) - { - check_maat_expiration(mr_ctx->read_ctx, logger); - cleanup_update_status(mr_ctx->read_ctx, logger); - redlock_unlock(mr_ctx->read_ctx, mr_expire_lock); - } - } - if(mr_ctx->read_ctx==NULL||mr_ctx->read_ctx->err) - { - if(time(NULL)-mr_ctx->last_reconnect_time < MAAT_REDIS_RECONNECT_INTERVAL) - { - return; - } - mr_ctx->last_reconnect_time=time(NULL); - if(mr_ctx->read_ctx!=NULL) - { - redisFree(mr_ctx->read_ctx); - } - MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, "Reconnecting..."); - mr_ctx->read_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, feather->logger); - if(mr_ctx->read_ctx==NULL) - { - return; - } - else - { - version=0;//Trigger full update when reconnect to redis. - } - } - - rule_num=get_rm_key_list(mr_ctx->read_ctx, version, feather->load_version_from, &new_version, feather->table_mgr, &rule_list, &update_type, logger, feather->cumulative_update_off); - if(rule_num<0)//redis communication error - { - return; - } - feather->load_version_from=0;//only valid for one time. - if(rule_num==0&&update_type==CM_UPDATE_TYPE_INC)//error or nothing changed - { - return; - } - if(rule_num>0) - { - ret=get_maat_redis_value(mr_ctx->read_ctx, rule_list, rule_num, logger, 0); - if(ret<0) - { - MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update."); - goto clean_up; - } - for(i=0;i0) - { - MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,"%d of %d rules are empty.",empty_value_num,rule_num); - } - ret=get_foreign_keys_define(mr_ctx->read_ctx, rule_list, rule_num, feather, feather->foreign_cont_dir, logger); - if(ret>0) - { - get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0, logger); - } - } - start(new_version,update_type,u_para); - MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Start %s update: %lld -> %lld (%d entries).", - update_type==CM_UPDATE_TYPE_INC?"INC":"FULL",version,new_version,rule_num); - for(i=0;itable_mgr, rule_list[i].table_name); - if(table_id<0)//Unrecognized table. - { - continue; - } - table_type=Maat_table_get_type_by_id(feather->table_mgr, table_id); - if(rule_list[i].op==MAAT_OP_DEL) - { - - scan_type=Maat_table_get_scan_type(table_type); - table_schema=Maat_table_get_scan_by_id(feather->table_mgr, table_id, scan_type, NULL); - valid_column=Maat_table_xx_plugin_table_get_valid_flag_column(table_schema); - ret=invalidate_line(rule_list[i].table_line, table_type, valid_column); - if(ret<0) - { - MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Invalidate line failed, invaid format %s ." - ,rule_list[i].table_line); - continue; - } - } - if(rule_list[i].n_foreign>0) - { - rewrite_table_line_with_foreign(rule_list+i); - } - update(rule_list[i].table_name,rule_list[i].table_line,u_para); - } - finish(u_para); - -clean_up: - for(i=0;itable_name!=NULL) - { - dst->table_name=_maat_strdup(src->table_name); - } - switch(dst->region_type) - { - case REGION_IP: - dst->ip_rule.src_ip=_maat_strdup(src->ip_rule.src_ip); - dst->ip_rule.mask_src_ip=_maat_strdup(src->ip_rule.mask_src_ip); - dst->ip_rule.dst_ip=_maat_strdup(src->ip_rule.dst_ip); - dst->ip_rule.mask_dst_ip=_maat_strdup(src->ip_rule.mask_dst_ip); - break; - case REGION_EXPR: - dst->expr_rule.keywords=_maat_strdup(src->expr_rule.keywords); - dst->expr_rule.district=_maat_strdup(src->expr_rule.district); - break; - case REGION_INTERVAL: - break; - case REGION_DIGEST: - dst->digest_rule.digest_string=_maat_strdup(src->digest_rule.digest_string); - break; - case REGION_SIMILARITY: - dst->similarity_rule.target=_maat_strdup(src->similarity_rule.target); - break; - default: - assert(0); - } - return; -} -void _maat_empty_region(struct Maat_region_t* p) -{ - free((char*)p->table_name); - p->table_name=NULL; - switch(p->region_type) - { - case REGION_IP: - free((char*)p->ip_rule.src_ip); - free((char*)p->ip_rule.mask_src_ip); - free((char*)p->ip_rule.dst_ip); - free((char*)p->ip_rule.mask_dst_ip); - break; - case REGION_EXPR: - free((char*)p->expr_rule.keywords); - free((char*)p->expr_rule.district); - break; - case REGION_INTERVAL: - break; - case REGION_DIGEST: - free((char*)p->digest_rule.digest_string); - break; - case REGION_SIMILARITY: - free((char*)p->similarity_rule.target); - break; - default: - assert(0); - } - memset(p,0,sizeof(struct Maat_region_t)); - return; - -} -int Maat_command_raw_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** line_rule, size_t n_line ,enum MAAT_OPERATION op) -{ - size_t i=0; - _Maat_feather_t* _feather=(_Maat_feather_t*)feather; - int ret=0, table_id=0,success_cnt=0; - struct serial_rule_t *s_rule=NULL; - long long server_time=0,absolute_expire_time=0; - redisContext* write_ctx=get_redis_ctx_for_write(_feather); - if(write_ctx==NULL) - { - return -1; - } - server_time=redis_server_time(write_ctx); - if(!server_time) - { - return -1; - } - s_rule=ALLOC(struct serial_rule_t, n_line); - for(i=0;itable_mgr, line_rule[i]->table_name); - if(table_id<0) - { - MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command, - "Command raw set line id %d failed: unknown table %s.", - line_rule[i]->rule_id, - line_rule[i]->table_name); - ret=-1; - goto error_out; - } - if(op==MAAT_OP_RENEW_TIMEOUT) - { - assert(line_rule[i]->expire_after>0); - } - if(line_rule[i]->expire_after>0) - { - absolute_expire_time=server_time+line_rule[i]->expire_after; - } - set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id, line_rule[i]->table_name, - line_rule[i]->table_line, absolute_expire_time); - } - success_cnt=exec_serial_rule(write_ctx,s_rule, n_line,server_time,_feather->logger); - if(success_cnt<0||(size_t)success_cnt!=n_line)//error - { - ret=-1; - goto error_out; - } - ret=success_cnt; - _feather->line_cmd_acc_num+=success_cnt; - -error_out: - for(i=0;itable_mgr, line_rule[i]->table_name); - if(table_id<0) - { - MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command - ,"Command set line id %d failed: unknown table %s." - , line_rule[i]->rule_id - , line_rule[i]->table_name); - ret=-1; - goto error_out; - } - p_table=Maat_table_get_by_id_raw(_feather->table_mgr, table_id); - if(!p_table) - { - ret=-1; - goto error_out; - } - int valid_flag_column=0; - - valid_flag_column=Maat_table_xx_plugin_table_get_valid_flag_column(p_table); - if(valid_flag_column<0) - { - MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command - ,"Command set line id %d failed: table %s is not a plugin or ip_plugin table." - , line_rule[i]->rule_id - , line_rule[i]->table_name); - ret=-1; - goto error_out; - - } - - if(op==MAAT_OP_ADD) - { - ret=get_valid_flag_offset(line_rule[i]->table_line - , p_table->table_type - , valid_flag_column); - if(ret<0|| - (op==MAAT_OP_ADD&&line_rule[i]->table_line[ret]!='1')) - { - MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command - ,"Command set line %s %d failed: illegal valid flag." - , line_rule[i]->table_name, line_rule[i]->rule_id); - ret=-1; - goto error_out; - } - } - if(op==MAAT_OP_RENEW_TIMEOUT) - { - assert(line_rule[i]->expire_after>0); - } - if(line_rule[i]->expire_after>0) - { - absolute_expire_time=server_time+line_rule[i]->expire_after; - } - if(plugin_desc && plugin_desc->n_foreign>0) - { - for(j=0;jn_foreign;j++) - { - p_foreign=find_Nth_column(line_rule[i]->table_line, plugin_desc->foreign_columns[j], &foreign_key_size); - if(p_foreign==NULL) - { - MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL, maat_command - , "Command set line %s %d failed: No %dth column." - , line_rule[i]->table_name, line_rule[i]->rule_id - , plugin_desc->foreign_columns[j]); - ret=-1; - goto error_out; - } - if(0!=strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix))) - { - MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_redis_monitor - ,"Command set line %s %d failed: Source prefix %s is mandatory." - , line_rule[i]->table_name, line_rule[i]->rule_id, foreign_source_prefix); - ret=-1; - goto error_out; - } - } - - } - set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id, line_rule[i]->table_name, - line_rule[i]->table_line, absolute_expire_time); - } - success_cnt=exec_serial_rule(write_ctx,s_rule, line_num,server_time,_feather->logger); - if(success_cnt<0||success_cnt!=line_num)//error - { - ret=-1; - goto error_out; - } - ret=success_cnt; - _feather->line_cmd_acc_num+=success_cnt; - -error_out: - for(i=0;imr_ctx.write_ctx; - if(ctx==NULL) - { - MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command, "%s failed: Redis is not connected.", __FUNCTION__); - return -1; - } - const char *arg_vec[3]; - size_t len_vec[3]; - arg_vec[0] = "SET"; - len_vec[0] = strlen("SET"); - - arg_vec[1] = key; - len_vec[1] = strlen(key); - - arg_vec[2] = value; - len_vec[2] = size; - - redisReply *reply=NULL; - if(0!=strncmp(key, foreign_key_prefix, strlen(foreign_key_prefix))) - { - MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command, "Invalid File key, prefix %s is mandatory.", foreign_key_prefix); - return -1; - } - switch(op) - { - case MAAT_OP_ADD: - reply= (redisReply *)redisCommandArgv(ctx, sizeof(arg_vec) / sizeof(arg_vec[0]), arg_vec, len_vec); - break; - case MAAT_OP_DEL: - reply=_wrap_redisCommand(ctx,"EXPIRE %s %d", key, MAAT_REDIS_SYNC_TIME); - break; - default: - return -1; - break; - } - if(reply==NULL||reply->type==REDIS_REPLY_NIL||reply->type==REDIS_REPLY_ERROR) - { - MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command,"Set file failed, maybe Redis is busy."); - freeReplyObject(reply); - reply=NULL; - return -1; - } - freeReplyObject(reply); - reply=NULL; - return 1; -} - -long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment) -{ - _Maat_feather_t* _feather=(_Maat_feather_t*)feather; - redisReply* data_reply=NULL; - long long result=0; - redisContext* write_ctx=get_redis_ctx_for_write(_feather); - - if(write_ctx==NULL) - { - return -1; - } - data_reply=_wrap_redisCommand(write_ctx,"INCRBY %s %d", key, increment); - if(data_reply->type==REDIS_REPLY_INTEGER) - { - result=data_reply->integer; - } - else - { - result=-1; - } - freeReplyObject(data_reply); - data_reply=NULL; - return result; -} -int Maat_command_get_new_group_id(Maat_feather_t feather) -{ - int group_id=0; - group_id=(int) Maat_cmd_incrby(feather, mr_group_id_var, 1); - return group_id; -} -int Maat_command_get_new_region_id(Maat_feather_t feather) -{ - int region_id=0; - region_id=(int) Maat_cmd_incrby(feather, mr_region_id_var, 1); - return region_id; -} - -void Maat_cmd_key_free(struct Maat_cmd_key**keys, int size) -{ - int i=0; - struct Maat_cmd_key* p=*keys; - for(i=0; itable_name); - p->table_name=NULL; - p->rule_id=0; - } - free(*keys); - *keys=NULL; - return; -} - -int Maat_cmd_key_select(Maat_feather_t feather, int label_id, struct Maat_cmd_key** keys) -{ - _Maat_feather_t* _feather=(_Maat_feather_t*)feather; - redisReply* data_reply=NULL; - char* tmp=NULL; - unsigned int i=0; - struct Maat_cmd_key* result=NULL; - int result_cnt=0; - redisContext* write_ctx=get_redis_ctx_for_write(_feather); - if(write_ctx==NULL) - { - return -1; - } - - data_reply=_wrap_redisCommand(write_ctx,"ZRANGEBYSCORE %s %d %d", - mr_label_sset, - label_id, - label_id); - result_cnt=data_reply->elements; - result=ALLOC(struct Maat_cmd_key, data_reply->elements); - for(i=0;ielements;i++) - { - result[i].table_name=_maat_strdup(data_reply->element[i]->str); - tmp=strchr(result[i].table_name, ','); - if(tmp!=NULL) - { - *tmp='\0'; - tmp++; - result[i].rule_id=atoi(tmp); - } - else// old version compatible - { - result[i].rule_id=atoi(result[i].table_name); - free(result[i].table_name); - result[i].table_name=NULL; - } - } - freeReplyObject(data_reply); - data_reply=NULL; - - *keys=result; - return result_cnt; -} - -int redis_flush_DB(redisContext* ctx, int db_index, void* logger) -{ - redisReply* data_reply=NULL; - long long maat_redis_version=0, dbsize=0; - int append_cmd_cnt=0, i=0,ret=0; - int redis_transaction_success=1; - - data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION"); - freeReplyObject(data_reply); - data_reply=NULL; - data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION"); - if(data_reply->type==REDIS_REPLY_NIL) - { - maat_redis_version=0; - } - else - { - maat_redis_version=read_redis_integer(data_reply); - maat_redis_version++; - freeReplyObject(data_reply); - data_reply=NULL; - } - data_reply=_wrap_redisCommand(ctx, "DBSIZE"); - dbsize=read_redis_integer(data_reply); - freeReplyObject(data_reply); - data_reply=NULL; - - data_reply=_wrap_redisCommand(ctx,"MULTI"); - freeReplyObject(data_reply); - data_reply=NULL; - - redisAppendCommand(ctx,"FLUSHDB"); - append_cmd_cnt++; - redisAppendCommand(ctx,"SET MAAT_VERSION %lld",maat_redis_version); - append_cmd_cnt++; - redisAppendCommand(ctx,"SET MAAT_PRE_VER %lld",maat_redis_version); - append_cmd_cnt++; - redisAppendCommand(ctx,"SET %s 1", mr_region_id_var); - append_cmd_cnt++; - redisAppendCommand(ctx,"SET %s 1", mr_group_id_var); - append_cmd_cnt++; - redisAppendCommand(ctx,"EXEC"); - append_cmd_cnt++; - for(i=0;iqueue); - batch->feather=(struct _Maat_feather_t *)feather; - redisContext* write_ctx=get_redis_ctx_for_write(batch->feather); - if(write_ctx==NULL) - { - free(batch); - return NULL; - } - batch->server_time=redis_server_time(write_ctx); - if(!batch->server_time) - { - free(batch); - return NULL; - } - return batch; -} - -int Maat_command_batch_set_region(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id) -{ - struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); - long long absolute_expire_time=0; - char line[MAX_TABLE_LINE_SIZE]; - - serialize_region(region, group_id, line, sizeof(line)); - - set_serial_rule(s_rule, op, region->region_id, 0, region->table_name, - line, absolute_expire_time); - TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); - batch->batch_size++; - return 0; - -} -#define TO_GROUP2X_KEY(group_id, parent_id) ((unsigned long)group_id<<32|parent_id) - -int Maat_command_batch_set_group2group(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g) -{ - struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); - long long absolute_expire_time=0; - char line[MAX_TABLE_LINE_SIZE]; - - serialize_group2group(op, g2g, line, sizeof(line)); - - set_serial_rule(s_rule, op, TO_GROUP2X_KEY(g2g->group_id, g2g->superior_group_id), 0, g2g->table_name, - line, absolute_expire_time); - - TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); - batch->batch_size++; - return 0; -} -int Maat_command_batch_set_group2compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c) -{ - struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); - long long absolute_expire_time=0; - char line[MAX_TABLE_LINE_SIZE]; - - serialize_group2compile(op, g2c, line, sizeof(line)); - set_serial_rule(s_rule, op, TO_GROUP2X_KEY(g2c->group_id, g2c->compile_id), 0, g2c->table_name, - line, absolute_expire_time); - TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); - batch->batch_size++; - return 0; -} -int Maat_command_batch_set_compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after) -{ - - struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); - long long absolute_expire_time=0; - char line[MAX_TABLE_LINE_SIZE]; - serialize_compile(compile, huge_service_defined, clause_num, op, line, sizeof(line)); - - if(expire_after>0) - { - absolute_expire_time=batch->server_time+expire_after; - } - set_serial_rule(s_rule, op, compile->config_id, label_id, table_name, - line, absolute_expire_time); - - TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); - batch->batch_size++; - return 0; -} -int Maat_command_batch_commit(struct Maat_command_batch* batch) -{ - struct serial_rule_t* s_rule_array=ALLOC(struct serial_rule_t, batch->batch_size); - int i=0; - redisContext* write_ctx=get_redis_ctx_for_write(batch->feather); - struct serial_rule_t * tmp = TAILQ_FIRST(&batch->queue); - - while(tmp != NULL) - { - TAILQ_REMOVE(&batch->queue, tmp, entries); - memcpy(s_rule_array+i, tmp, sizeof(*tmp)); - free(tmp); - tmp = TAILQ_FIRST(&batch->queue); - i++; - } - assert(i==batch->batch_size); - exec_serial_rule(write_ctx, s_rule_array, batch->batch_size, batch->server_time, batch->feather->logger); - for(i=0; ibatch_size; i++) - { - empty_serial_rules(s_rule_array+i); - } - free(s_rule_array); - free(batch); - return i; -} - -int Maat_command_raw_set_compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after) -{ - struct Maat_command_batch* batch=NULL; - batch=Maat_command_batch_new(feather); - Maat_command_batch_set_compile(batch, op, compile, table_name, huge_service_defined, clause_num, label_id, expire_after); - Maat_command_batch_commit(batch); - return 0; -} -int Maat_command_raw_set_region(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id) -{ - struct Maat_command_batch* batch=NULL; - batch=Maat_command_batch_new(feather); - Maat_command_batch_set_region(batch, op, region, group_id); - Maat_command_batch_commit(batch); - return 0; -} -int Maat_command_raw_set_group2compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c) -{ - struct Maat_command_batch* batch=NULL; - batch=Maat_command_batch_new(feather); - Maat_command_batch_set_group2compile(batch, op, g2c); - Maat_command_batch_commit(batch); - return 0; -} -int Maat_command_raw_set_group2group(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g) -{ - - struct Maat_command_batch* batch=NULL; - batch=Maat_command_batch_new(feather); - Maat_command_batch_set_group2group(batch, op, g2g); - Maat_command_batch_commit(batch); - return 0; -} - -int Maat_cmd_flushDB(Maat_feather_t feather) -{ - _Maat_feather_t* _feather=(_Maat_feather_t*)feather; - int ret=0; - redisContext* write_ctx=get_redis_ctx_for_write(_feather); - if(write_ctx==NULL) - { - return -1; - } - do - { - ret=redis_flush_DB(_feather->mr_ctx.write_ctx, _feather->mr_ctx.redis_db, _feather->logger); - }while(ret==0); - return 0; -} - +#include "Maat_command.h" +#include "Maat_rule.h" +#include "Maat_rule_internal.h" +#include "Maat_utils.h" +#include "config_monitor.h" +#include "map_str2int.h" +#include "hiredis.h" +#include +#include +#include +#include +#include + +#define maat_redis_monitor (module_name_str("MAAT_REDIS_MONITOR")) +#define maat_command (module_name_str("MAAT_COMMAND")) +const time_t MAAT_REDIS_RECONNECT_INTERVAL=5; +const char* mr_key_prefix[2]={"OBSOLETE_RULE","EFFECTIVE_RULE"}; +const char* mr_status_sset="MAAT_UPDATE_STATUS"; +const char* mr_expire_sset="MAAT_EXPIRE_TIMER"; +const char* mr_label_sset="MAAT_LABEL_INDEX"; +const char* mr_version_sset="MAAT_VERSION_TIMER"; +const char* mr_expire_lock="EXPIRE_OP_LOCK"; +const long mr_expire_lock_timeout=300*1000; +const static int MAAT_REDIS_SYNC_TIME=30*60; +const char* mr_op_str[]={"DEL","ADD","RENEW_TIMEOUT"}; +const char* foreign_source_prefix="redis://"; +const char* foreign_key_prefix="__FILE_"; + + + +int _wrap_redisGetReply(redisContext *c, redisReply **reply) +{ + return redisGetReply(c, (void **)reply); +} +redisReply *_wrap_redisCommand(redisContext *c, const char *format, ...) +{ + va_list ap; + void *reply = NULL; + int ret=REDIS_ERR, retry=0; + while(reply==NULL&&retry<2&&ret!=REDIS_OK) + { + va_start(ap,format); + reply = redisvCommand(c,format,ap); + va_end(ap); + if(reply==NULL) + { + ret=redisReconnect(c); + retry++; + } + } + return (redisReply *)reply; +} +redisContext * connect_redis(const char*redis_ip, int redis_port, int redis_db, void* logger) +{ + struct timeval connect_timeout; + connect_timeout.tv_sec=0; + connect_timeout.tv_usec=100*1000; // 100 ms + redisReply* reply=NULL; + + redisContext * ctx; + ctx=redisConnectWithTimeout(redis_ip, redis_port,connect_timeout); + if(ctx==NULL||ctx->err) + { + if(logger==NULL) + { + printf("Unable to connect redis server %s:%d db%d, error: %s\n", + redis_ip, redis_port, redis_db, ctx==NULL ? "Unknown" : ctx->errstr); + + } + else + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Unable to connect redis server %s:%d db%d, error: %s", + redis_ip, redis_port, redis_db, ctx==NULL ? "Unknown" : ctx->errstr); + } + if(ctx!=NULL) redisFree(ctx); + return NULL; + } + redisEnableKeepAlive(ctx); + reply=_wrap_redisCommand(ctx, "select %d",redis_db); + freeReplyObject(reply); + reply=NULL; + + return ctx; + +} + +int connect_redis_for_write(struct source_redis_ctx* mr_ctx, void* logger) +{ + assert(mr_ctx->write_ctx==NULL); + mr_ctx->write_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, logger); + if(mr_ctx->write_ctx==NULL) + { + return -1; + } + else + { + return 0; + } +} +redisContext* get_redis_ctx_for_write(struct _Maat_feather_t * feather) +{ + int ret=0; + if(feather->mr_ctx.write_ctx==NULL) + { + ret=connect_redis_for_write(&(feather->mr_ctx), feather->logger); + if(ret!=0) + { + return NULL; + } + } + return feather->mr_ctx.write_ctx; +} +long long read_redis_integer(const redisReply* reply) +{ + switch(reply->type) + { + case REDIS_REPLY_INTEGER: + return reply->integer; + break; + case REDIS_REPLY_ARRAY: + assert(reply->element[0]->type==REDIS_REPLY_INTEGER); + return reply->element[0]->integer; + break; + case REDIS_REPLY_STRING: + return atoll(reply->str); + break; + default: + return -1; + break; + } + return 0; +} +long long redis_server_time(redisContext* ctx) +{ + long long server_time=0; + redisReply* data_reply=NULL; + data_reply=_wrap_redisCommand(ctx,"TIME"); + if(data_reply->type==REDIS_REPLY_ARRAY) + { + server_time=atoll(data_reply->element[0]->str); + freeReplyObject(data_reply); + data_reply=NULL; + } + return server_time; +} +enum MAAT_TABLE_TYPE type_region2table(const struct Maat_region_t* p) +{ + enum MAAT_TABLE_TYPE ret=TABLE_TYPE_IP; + switch(p->region_type) + { + case REGION_IP: + ret=TABLE_TYPE_IP; + break; + case REGION_EXPR: + if(p->expr_rule.district==NULL) + { + ret=TABLE_TYPE_EXPR; + } + else + { + ret=TABLE_TYPE_EXPR_PLUS; + } + break; + case REGION_INTERVAL: + if(p->interval_rule.district==NULL) + { + ret=TABLE_TYPE_INTERVAL; + } + else + { + ret=TABLE_TYPE_INTERVAL_PLUS; + } + break; + case REGION_DIGEST: + ret=TABLE_TYPE_DIGEST; + break; + case REGION_SIMILARITY: + ret=TABLE_TYPE_SIMILARITY; + break; + default: + assert(0); + } + return ret; +} +int get_valid_flag_offset(const char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq) +{ + size_t offset=0, len=0; + unsigned int column_seq=0, ret=0; + switch(type) + { + case TABLE_TYPE_EXPR: + column_seq=7; + break; + case TABLE_TYPE_IP: + column_seq=14; + break; + case TABLE_TYPE_IP_PLUS: + column_seq=18; + break; + case TABLE_TYPE_COMPILE: + column_seq=8; + break; + case TABLE_TYPE_PLUGIN: + case TABLE_TYPE_IP_PLUGIN: + case TABLE_TYPE_FQDN_PLUGIN: + if(valid_column_seq<0) + { + return -1; + } + column_seq=(unsigned int)valid_column_seq; + break; + case TABLE_TYPE_INTERVAL: + column_seq=5; + break; + case TABLE_TYPE_INTERVAL_PLUS: + column_seq=6; + break; + case TABLE_TYPE_DIGEST: + column_seq=6; + break; + case TABLE_TYPE_SIMILARITY: + column_seq=5; + break; + case TABLE_TYPE_EXPR_PLUS: + column_seq=8; + break; + case TABLE_TYPE_GROUP2COMPILE: + case TABLE_TYPE_GROUP2GROUP: + column_seq=3; + break; + default: + assert(0); + } + + ret=get_column_pos(line, column_seq, &offset, &len); + if(ret<0||offset>=strlen(line)||(line[offset]!='1'&&line[offset]!='0'))// 0 is also a valid value for some non-MAAT producer. + { + return -1; + } + return offset; +} +int invalidate_line(char* line, enum MAAT_TABLE_TYPE type,int valid_column_seq) +{ + int i=0; + i=get_valid_flag_offset(line, type,valid_column_seq); + if(i<0) + { + return -1; + } + line[i]='0'; + return 0; +} + +void serialize_group2group(enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g, char* buff, size_t sz) +{ + snprintf(buff, sz, "%d\t%d\t%d", g2g->group_id, + g2g->superior_group_id, + op); + return; +} +void serialize_group2compile(enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c, char* buff, size_t sz) +{ + snprintf(buff, sz, "%d\t%d\t%d\t%d\t%s\t%d", g2c->group_id, + g2c->compile_id, + op, + g2c->not_flag, + g2c->virtual_table_name?g2c->virtual_table_name:"null", + g2c->clause_index); + return; +} +void serialize_compile(const struct Maat_rule_t* p_m_rule, const char* huge_service_defined, int clause_num, enum MAAT_OPERATION op, char* buff, size_t sz) +{ + if(op==MAAT_OP_RENEW_TIMEOUT) op=MAAT_OP_ADD; + const char* service_define=huge_service_defined?huge_service_defined:(strlen(p_m_rule->service_defined)?p_m_rule->service_defined:"null"); + + snprintf(buff, sz, "%d\t%d\t%hhu\t%hhu\t%hhu\t0\t%s\t%d\t%d", + p_m_rule->config_id, + p_m_rule->service_id, + p_m_rule->action, + p_m_rule->do_blacklist, + p_m_rule->do_log, + service_define, + op, + clause_num); + return; +} +void serialize_region(const struct Maat_cmd_region* p, int group_id, char* buff, size_t sz) +{ + UNUSED size_t ret=0; + switch(p->region_type) + { + case REGION_IP: + ret=snprintf(buff, sz, "%d\t%d\t%d\t%s\t%s\t%hu\t%hu\t%s\t%s\t%hu\t%hu\t%d\t%d\t1", + p->region_id, + group_id, + p->ip_rule.addr_type, + p->ip_rule.src_ip, + p->ip_rule.mask_src_ip, + p->ip_rule.src_port, + p->ip_rule.mask_src_port, + p->ip_rule.dst_ip, + p->ip_rule.mask_dst_ip, + p->ip_rule.dst_port, + p->ip_rule.mask_dst_port, + p->ip_rule.protocol, + p->ip_rule.direction); + break; + case REGION_IP_PLUS: + ret=snprintf(buff, sz, "%d\t%d\t%d\t%s\t%s\t%s\t%s\t%hu\t%hu\t%s\t%s\t%s\t%s\t%hu\t%hu\t%d\t%d\t1", + p->region_id, + group_id, + p->ip_plus_rule.addr_type, + p->ip_plus_rule.saddr_format, + p->ip_plus_rule.src_ip1, + p->ip_plus_rule.src_ip2, + p->ip_plus_rule.sport_format, + p->ip_plus_rule.src_port1, + p->ip_plus_rule.src_port2, + p->ip_plus_rule.daddr_format, + p->ip_plus_rule.dst_ip1, + p->ip_plus_rule.dst_ip2, + p->ip_plus_rule.dport_format, + p->ip_plus_rule.dst_port1, + p->ip_plus_rule.dst_port2, + p->ip_plus_rule.protocol, + p->ip_plus_rule.direction); + break; + case REGION_EXPR: + if(p->expr_rule.district==NULL) + { + ret=snprintf(buff,sz,"%d\t%d\t%s\t%d\t%d\t%d\t1", + p->region_id, + group_id, + p->expr_rule.keywords, + p->expr_rule.expr_type, + p->expr_rule.match_method, + p->expr_rule.hex_bin); + } + else //expr_plus + { + ret=snprintf(buff,sz,"%d\t%d\t%s\t%s\t%d\t%d\t%d\t1", + p->region_id, + group_id, + p->expr_rule.district, + p->expr_rule.keywords, + p->expr_rule.expr_type, + p->expr_rule.match_method, + p->expr_rule.hex_bin); + } + break; + case REGION_INTERVAL: + ret=snprintf(buff,sz,"%d\t%d\t%u\t%u\t1", + p->region_id, + group_id, + p->interval_rule.low_boundary, + p->interval_rule.up_boundary); + break; + case REGION_DIGEST: + ret=snprintf(buff,sz,"%d\t%d\t%llu\t%s\t%hd\t1", + p->region_id, + group_id, + p->digest_rule.orgin_len, + p->digest_rule.digest_string, + p->digest_rule.confidence_degree); + break; + case REGION_SIMILARITY: + ret=snprintf(buff,sz,"%d\t%d\t%s\t%hd\t1", + p->region_id, + group_id, + p->similarity_rule.target, + p->similarity_rule.threshold); + break; + default: + assert(0); + } + assert(rettable_line!=NULL) + { + free(rule->table_line); + } + if(rule->n_foreign>0) + { + for(int i=0; in_foreign; i++) + { + free(rule->f_keys[i].filename); + free(rule->f_keys[i].key); + } + free(rule->f_keys); + } + memset(rule,0,sizeof(struct serial_rule_t)); + return; +} +void set_serial_rule(struct serial_rule_t* rule, enum MAAT_OPERATION op, unsigned long rule_id,int label_id,const char* table_name,const char* line, long long timeout) +{ + memset(rule, 0, sizeof(struct serial_rule_t)); + rule->op=op; + rule->rule_id=rule_id; + rule->label_id=label_id; + rule->timeout=timeout; + assert(strlen(table_name)table_name)); + strncpy(rule->table_name, table_name, sizeof(rule->table_name)); + if(line!=NULL) + { + rule->table_line=_maat_strdup(line); + } + return; +} +int get_inc_key_list(long long instance_version, long long target_version, redisContext *c, struct serial_rule_t** list,void* logger) +{ + redisReply* reply=NULL,*tmp_reply=NULL; + char err_buff[256], op_str[4]; + int rule_num=0; + UNUSED int ret=0; + unsigned int i=0, j=0; + long long nearest_rule_version; + struct serial_rule_t *s_rule=NULL; + + //Returns all the elements in the sorted set at key with a score that instance_version < score <= redis_version. + //The elements are considered to be ordered from low to high scores(instance_version). + reply=(redisReply*)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",mr_status_sset,instance_version,target_version); + + if(reply==NULL) + { + __redis_strerror_r(errno,err_buff,sizeof(err_buff)-1); + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "GET %s failed %s.",mr_status_sset,err_buff); + return -1; + } + assert(reply->type==REDIS_REPLY_ARRAY); + rule_num=reply->elements; + if(reply->elements==0) + { + freeReplyObject(reply); + reply=NULL; + return 0; + } + + tmp_reply=_wrap_redisCommand(c, "ZSCORE %s %s",mr_status_sset,reply->element[0]->str); + if(tmp_reply->type!=REDIS_REPLY_STRING) + { + MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, + "ZSCORE %s %s failed Version: %lld->%lld",mr_status_sset,reply->element[0]->str,instance_version, target_version); + freeReplyObject(tmp_reply); + tmp_reply=NULL; + freeReplyObject(reply); + reply=NULL; + return -1; + } + nearest_rule_version=read_redis_integer(tmp_reply); + freeReplyObject(tmp_reply); + tmp_reply=NULL; + if(nearest_rule_version<0) + { + return -1; + } + if(nearest_rule_version!=instance_version+1) + { + MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, + "Noncontinuous VERSION Redis: %lld MAAT: %lld.",nearest_rule_version,instance_version); + } + s_rule=(struct serial_rule_t*)calloc(reply->elements,sizeof(struct serial_rule_t)); + for(i=0, j=0;ielements;i++) + { + assert(reply->element[i]->type==REDIS_REPLY_STRING); + ret=sscanf(reply->element[i]->str,"%[^,],%[^,],%lu",op_str,s_rule[j].table_name,&(s_rule[j].rule_id)); + if(ret!=3||s_rule[i].rule_id<0) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Invalid Redis Key: %s",reply->element[i]->str); + continue; + } + if(strncmp(op_str,"ADD",strlen("ADD"))==0) + { + s_rule[j].op=MAAT_OP_ADD; + } + else if(strncmp(op_str,"DEL",strlen("DEL"))==0) + { + s_rule[j].op=MAAT_OP_DEL; + } + else + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Invalid Redis Key: %s",reply->element[i]->str); + continue; + } + j++; + } + rule_num=j; + *list=s_rule; + freeReplyObject(reply); + reply=NULL; + + return rule_num; +} +struct s_rule_array_t +{ + int cnt; + int size; + struct serial_rule_t* array; +}; +void save_serial_rule_cb(const uchar * key, uint size, void * data, void * user) +{ + struct s_rule_array_t* array=(struct s_rule_array_t*)user; + int i=array->cnt; + memcpy(&(array->array[i]),data,sizeof(struct serial_rule_t)); + array->array[i].op=MAAT_OP_ADD; + return; +} +int recovery_history_version(const struct serial_rule_t* current, int current_num, const struct serial_rule_t* changed, int changed_num, struct serial_rule_t** history_result) +{ + int i=0,ret=0; + unsigned int history_num=0; + int hash_slot_size=1; + MESA_htable_handle htable=NULL; + MESA_htable_create_args_t hargs; + struct s_rule_array_t tmp_array; + char hkey[256+20]; + int tmp=current_num+changed_num; + for(;tmp>0;tmp=tmp/2) + { + hash_slot_size*=2; + } + + memset(&hargs,0,sizeof(hargs)); + hargs.thread_safe=0; + hargs.hash_slot_size = hash_slot_size; + hargs.max_elem_num = 0; + hargs.eliminate_type = HASH_ELIMINATE_ALGO_FIFO; + hargs.expire_time = 0; + hargs.key_comp = NULL; + hargs.key2index = NULL; + hargs.recursive = 1; + hargs.data_free = NULL;//data is an reference, no need to free. + hargs.data_expire_with_condition = NULL; + htable=MESA_htable_create(&hargs, sizeof(hargs)); + MESA_htable_print_crtl(htable, 0); + + for(i=0;i0); + } + + for(i=changed_num-1;i>=0;i--) + { + snprintf(hkey,sizeof(hkey),"%ld,%s",changed[i].rule_id,changed[i].table_name); + if(changed[i].op==MAAT_OP_ADD)//newly added rule is need to delete from current, so that history version can be recovered. + { + ret=MESA_htable_del(htable, (uchar*)hkey, strlen(hkey), NULL); + } + else + { + ret=MESA_htable_add(htable, (uchar*)hkey, strlen(hkey),changed+i); + } + if(ret<0)//failed + { + goto error_out; + } + } + history_num=MESA_htable_get_elem_num(htable); + tmp_array.cnt=0; + tmp_array.size=history_num; + tmp_array.array=(struct serial_rule_t*)calloc(history_num,sizeof(struct serial_rule_t)); + MESA_htable_iterate(htable, save_serial_rule_cb, &tmp_array); + *history_result=tmp_array.array; + ret=history_num; +error_out: + MESA_htable_destroy(htable, NULL); + return ret; +} + +int get_rm_key_list(redisContext *c, long long instance_version, long long desired_version, long long* new_version, struct Maat_table_manager* table_mgr, struct serial_rule_t** list,int *update_type, void* logger, int cumulative_off) +{ + redisReply* reply=NULL,*sub_reply=NULL; + char err_buff[256]; + long long redis_version=0,target_version=0; + int rule_num=0, changed_rule_num=0, table_id=0; + int ret=0; + unsigned int i=0,full_idx =0,append_cmd_cnt=0; + struct serial_rule_t *s_rule_array=NULL, *changed_rule_array=NULL, *history_rule_array=NULL; + + reply=(redisReply*)redisCommand(c, "GET MAAT_VERSION"); + if(reply!=NULL) + { + + if(reply->type==REDIS_REPLY_NIL||reply->type==REDIS_REPLY_ERROR) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor,"GET MAAT_VERSION failed, maybe Redis is busy."); + freeReplyObject(reply); + reply=NULL; + return -1; + } + } + else + { + memset(err_buff, 0, sizeof(err_buff)); + __redis_strerror_r(errno, err_buff, sizeof(err_buff)-1); + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "GET MAAT_VERSION failed %s.",err_buff); + return -1; + } + redis_version=read_redis_integer(reply); + if(redis_version<0) + { + if(reply->type==REDIS_REPLY_ERROR) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Redis Communication error: %s.",reply->str); + } + return -1; + } + freeReplyObject(reply); + reply=NULL; + if(redis_version==instance_version) + { + return 0; + } + + if(instance_version==0||desired_version!=0) + { + goto FULL_UPDATE; + } + if(redis_version Redis: %lld.",instance_version,redis_version); + goto FULL_UPDATE; + } + if(redis_version>instance_version&&cumulative_off==1) + { + target_version=instance_version; + } + else + { + target_version=redis_version-1; + } + do{ + target_version++; + rule_num=get_inc_key_list(instance_version, target_version, c, &s_rule_array,logger); + if(rule_num>0) + { + break; + } + else if(rule_num<0) + { + goto FULL_UPDATE; + } + else + { + //ret=0, nothing to do. + } + + }while(rule_num==0&&target_version<=redis_version&&cumulative_off==1); + if(rule_num==0) + { + MESA_handle_runtime_log(logger, RLOG_LV_DEBUG, maat_redis_monitor,"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s" + ,mr_status_sset,instance_version,target_version-1,cumulative_off==1?"OFF":"ON"); + return 0; + } + MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, + "Inc Update from instance_version %lld to %lld (%d entries).",instance_version,target_version,rule_num); + *list=s_rule_array; + *update_type=CM_UPDATE_TYPE_INC; + *new_version=target_version; + return rule_num; + +FULL_UPDATE: + MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, + "Initiate full udpate from instance_version %d to %lld.",instance_version,desired_version==0?redis_version:desired_version); + append_cmd_cnt=0; + ret=redisAppendCommand(c, "MULTI"); + append_cmd_cnt++; + ret=redisAppendCommand(c, "GET MAAT_VERSION"); + append_cmd_cnt++; + ret=redisAppendCommand(c, "KEYS EFFECTIVE_RULE:*"); + append_cmd_cnt++; + //consume reply "OK" and "QUEUED". + for(i=0;ierrstr); + return -1; + } + if(reply->type!=REDIS_REPLY_ARRAY) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Invalid Redis Key List type %d", reply->type); + freeReplyObject(reply); + reply=NULL; + return -1; + } + *new_version=read_redis_integer(reply->element[0]); + sub_reply=reply->element[1]; + if(sub_reply->type!=REDIS_REPLY_ARRAY) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Invalid Redis Key List type %d", sub_reply->type); + freeReplyObject(reply); + reply=NULL; + return -1; + } + + s_rule_array=(struct serial_rule_t*)calloc(sub_reply->elements,sizeof(struct serial_rule_t)); + for(i=0, full_idx=0; ielements; i++) + { + if(sub_reply->element[i]->type!=REDIS_REPLY_STRING) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Invalid Redis Key Type: %d", sub_reply->element[i]->type); + continue; + } + ret=sscanf(sub_reply->element[i]->str,"%*[^:]:%[^,],%ld",s_rule_array[full_idx].table_name,&(s_rule_array[full_idx].rule_id)); + s_rule_array[full_idx].op=MAAT_OP_ADD; + if(ret!=2||s_rule_array[full_idx].rule_id<0||strlen(s_rule_array[full_idx].table_name)==0) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Invalid Redis Key Format: %s", sub_reply->element[i]->str); + continue; + } + if(table_mgr) + { + table_id=Maat_table_get_id_by_name(table_mgr, s_rule_array[full_idx].table_name); + if(table_id<0)//Unrecognized table. + { + continue; + } + } + full_idx++; + } + rule_num=full_idx; + freeReplyObject(reply); + reply=NULL; + if(desired_version!=0) + { + changed_rule_num=get_inc_key_list(desired_version, redis_version, c, &changed_rule_array, logger); + if(changed_rule_num<0) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Recover history version %lld faild where as redis version is %lld.", desired_version, redis_version); + } + else if(changed_rule_num==0) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Nothing to recover from history version %lld to redis version is %lld.", desired_version, redis_version); + } + else + { + ret=recovery_history_version(s_rule_array, full_idx, changed_rule_array, changed_rule_num, &history_rule_array); + if(ret>0) + { + free(s_rule_array); + s_rule_array=history_rule_array; + rule_num=ret; + *new_version=desired_version; + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Successfully recovered from history version %lld to redis version is %lld.", desired_version, redis_version); + } + } + free(changed_rule_array); + } + *list=s_rule_array; + *update_type=CM_UPDATE_TYPE_FULL; + MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, + "Full update %d keys of version %lld.", rule_num, *new_version); + + return rule_num ; +} + +int _get_maat_redis_value(redisContext *c, struct serial_rule_t* rule_list, int rule_num, void* logger) +{ + int i=0,failed_cnt=0,idx=0; + UNUSED int ret=0; + int error_happened=0; + int *retry_ids=(int*)malloc(sizeof(int)*rule_num); + char redis_cmd[256]; + redisReply* reply=NULL; + for(i=0;itype==REDIS_REPLY_STRING) + { + rule_list[i].table_line=_maat_strdup(reply->str); + } + else + { + if(reply->type==REDIS_REPLY_NIL) + { + retry_ids[failed_cnt]=i; + failed_cnt++; + } + else + { + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor + ,"Redis GET %s:%s,%d failed",mr_key_prefix[rule_list[i].op] + ,rule_list[i].table_name + ,rule_list[i].rule_id); + error_happened=1; + } + } + freeReplyObject(reply); + reply=NULL; + } + if(error_happened==1) + { + free(retry_ids); + return -1; + } + + for(i=0;itype==REDIS_REPLY_STRING) + { + rule_list[idx].table_line=_maat_strdup(reply->str); + } + else if(reply->type==REDIS_REPLY_ERROR)//Deal with Redis response: "Loading Redis is loading the database in memory" + { + MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor, + "redis command %s error, reply type=%d, error str=%s", redis_cmd, reply->type, reply->str); + } + else //Handle type "nil" + { + MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor, + "redis command %s failed, reply type=%d", redis_cmd, reply->type); + } + + freeReplyObject(reply); + reply=NULL; + + } + free(retry_ids); + return 0; +} +int get_maat_redis_value(redisContext *c,struct serial_rule_t* rule_list,int rule_num,void* logger,int print_process) +{ + int max_redis_batch=4*1024,batch_cnt=0; + int success_cnt=0,ret=0; + int next_print=10; + while(success_cntnext_print) + { + printf(" >%d%%",next_print); + next_print+=10; + } + } + } + if(print_process==1) + { + printf(" >100%%\n"); + } + return 0; +} + +int mr_transaction_success(redisReply* data_reply) +{ + if(data_reply->type==REDIS_REPLY_NIL) + { + return 0; + } + else + { + return 1; + } +} + +int redlock_try_lock(redisContext *ctx, const char* lock_name, long long expire) +{ + redisReply* reply=NULL; + int ret=0; + reply=_wrap_redisCommand(ctx,"SET %s locked NX PX %lld", lock_name, expire); + if(reply->type==REDIS_REPLY_NIL) + { + ret=0; + } + else + { + ret=1; + } + freeReplyObject(reply); + reply=NULL; + + return ret; +} +void redlock_unlock(redisContext * ctx, const char * lock_name) +{ + redisReply* reply=NULL; + reply=_wrap_redisCommand(ctx,"DEL %s", lock_name); + freeReplyObject(reply); + reply=NULL; + +} +#define POSSIBLE_REDIS_REPLY_SIZE 2 +struct expected_reply +{ + int srule_seq; + int possible_reply_num; + redisReply possible_replies[POSSIBLE_REDIS_REPLY_SIZE]; +}; +void expected_reply_add(struct expected_reply* expected, int srule_seq, int type, long long integer) +{ + int i=expected->possible_reply_num; + assert(isrule_seq=srule_seq; + expected->possible_replies[i].type=type; + expected->possible_replies[i].integer=integer; + expected->possible_reply_num++; +} +int mr_operation_success(redisReply* actual_reply, struct expected_reply* expected) +{ + int i=0; + if(expected->possible_replies[0].type!=actual_reply->type) + { + return 0; + } + for(i=0; i< expected->possible_reply_num; i++) + { + if(expected->possible_replies[i].type==REDIS_REPLY_INTEGER && + expected->possible_replies[i].type==actual_reply->type && + expected->possible_replies[i].integer==actual_reply->integer) + { + return 1; + } + if(expected->possible_replies[i].type==REDIS_REPLY_STATUS && + expected->possible_replies[i].type==actual_reply->type && + 0==strcasecmp(actual_reply->str, "OK")) + { + return 1; + } + } + return 0; + +} + +long long _exec_serial_rule_begin(redisContext* ctx,int rule_num, int renew_rule_num,int *renew_allowed, long long *transaction_version) +{ + int ret=-1; + redisReply* data_reply=NULL; + if(renew_rule_num>0) + { + while(0==redlock_try_lock(ctx, mr_expire_lock, mr_expire_lock_timeout)) + { + usleep(1000); + } + *renew_allowed=1; + } + if(rule_num>renew_rule_num) + { + data_reply=_wrap_redisCommand(ctx, "INCRBY MAAT_PRE_VER 1"); + *transaction_version=read_redis_integer(data_reply); + freeReplyObject(data_reply); + data_reply=NULL; + if(*transaction_version<0) + { + return -1; + } + } + if(*renew_allowed==1||rule_num>renew_rule_num) + { + data_reply=_wrap_redisCommand(ctx,"MULTI"); + freeReplyObject(data_reply); + data_reply=NULL; + ret=0; + } + return ret; +} +//parameters: 4 keys: MAAT_VERSION MAAT_UPDATE_STATUS MAAT_VERSION_TIMER MAAT_TRANSACTION_xx, 1 args: SERVER_TIME +const char* lua_exec_done= +"local maat_version=redis.call(\'incrby\', KEYS[1], 1);" +"local transaction=redis.call(\'lrange\', KEYS[4], 0, -1);" +"for k,v in pairs(transaction) do" +" redis.call(\'zadd\', KEYS[2], maat_version, v);" +"end;" +"redis.call(\'del\', KEYS[4]);" +"redis.call(\'zadd\', KEYS[3], ARGV[1], maat_version);" +"return maat_version;"; +redisReply* _exec_serial_rule_end(redisContext* ctx, const char* transaction_list, long long server_time, int renew_allowed, struct expected_reply* expect_reply, unsigned int *cnt) +{ + redisReply* data_reply=NULL; + if(renew_allowed==1) + { + redlock_unlock(ctx, mr_expire_lock); + expect_reply[*cnt].srule_seq=-1; + (*cnt)++; + } + if(strlen(transaction_list)>0) + { + data_reply=_wrap_redisCommand(ctx, "eval %s 4 MAAT_VERSION %s %s %s %lld", + lua_exec_done, + mr_status_sset, + mr_version_sset, + transaction_list, + server_time); + freeReplyObject(data_reply); + data_reply=NULL; + expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); + (*cnt)++; + } + data_reply=_wrap_redisCommand(ctx,"EXEC"); + return data_reply; +} + +void _exec_serial_rule(redisContext* ctx, const char* transaction_list, struct serial_rule_t* s_rule, unsigned int rule_num, struct expected_reply* expect_reply, unsigned int *cnt, int offset,int renew_allowed) +{ + redisReply* data_reply=NULL; + unsigned int append_cmd_cnt=0, i=0; + for(i=0;i0) + { + redisAppendCommand(ctx,"ZADD %s %lld %s,%lu", + mr_expire_sset, + s_rule[i].timeout, + s_rule[i].table_name, + s_rule[i].rule_id); + expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); + expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0); + (*cnt)++; + append_cmd_cnt++; + } + if(s_rule[i].label_id>0) + { + redisAppendCommand(ctx,"ZADD %s %d %s,%lu", + mr_label_sset, + s_rule[i].label_id, + s_rule[i].table_name, + s_rule[i].rule_id); + expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); + expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0); + + (*cnt)++; + + append_cmd_cnt++; + } + break; + case MAAT_OP_DEL: + redisAppendCommand(ctx,"RENAME %s:%s,%lu %s:%s,%lu", + mr_key_prefix[MAAT_OP_ADD], + s_rule[i].table_name, + s_rule[i].rule_id, + mr_key_prefix[MAAT_OP_DEL], + s_rule[i].table_name, + s_rule[i].rule_id + ); + expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0); + (*cnt)++; + append_cmd_cnt++; + + redisAppendCommand(ctx,"EXPIRE %s:%s,%lu %d", + mr_key_prefix[MAAT_OP_DEL], + s_rule[i].table_name, + s_rule[i].rule_id, + MAAT_REDIS_SYNC_TIME); + expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); + (*cnt)++; + append_cmd_cnt++; + + //NX: Don't update already exisiting elements. Always add new elements. + redisAppendCommand(ctx,"RPUSH %s DEL,%s,%lu", + transaction_list, + s_rule[i].table_name, + s_rule[i].rule_id); + expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); + (*cnt)++; + append_cmd_cnt++; + + // Try to remove from expiration sorted set, no matter wheather it exists or not. + redisAppendCommand(ctx,"ZREM %s %s,%lu", + mr_expire_sset, + s_rule[i].table_name, + s_rule[i].rule_id); + expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); + (*cnt)++; + append_cmd_cnt++; + + // Try to remove from label sorted set, no matter wheather it exists or not. + redisAppendCommand(ctx,"ZREM %s %s,%lu", + mr_label_sset, + s_rule[i].table_name, + s_rule[i].rule_id); + expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); + (*cnt)++; + append_cmd_cnt++; + break; + case MAAT_OP_RENEW_TIMEOUT: + if(renew_allowed!=1) + { + continue; + } + //s_rule[i].timeout>0 was checked by caller. + redisAppendCommand(ctx,"ZADD %s %lld %s,%lu", + mr_expire_sset, + s_rule[i].timeout, + s_rule[i].table_name, + s_rule[i].rule_id); + expected_reply_add(expect_reply+*cnt, -1, REDIS_REPLY_INTEGER, 0); + (*cnt)++; + append_cmd_cnt++; + + break; + default: + assert(0); + break; + } + } + for(i=0;i0) + { + snprintf(transaction_list, sizeof(transaction_list), "MAAT_TRANSACTION_%lld", transaction_version); + } + while(success_cntelements==multi_cmd_cnt); + for(i=0;ielement[i]; + //failed is acceptable + //or transaciton is success + //or continuation of last failed + if(expected_reply[i].srule_seq==-1||1==mr_operation_success(p, expected_reply+i)||last_failed==expected_reply[i].srule_seq) + { + continue; + } + rule_seq=expected_reply[i].srule_seq; + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_command, + "%s %s %d failed, rule id maybe conflict or not exist.", + mr_op_str[s_rule[rule_seq].op], + s_rule[rule_seq].table_name,s_rule[rule_seq].rule_id); + success_cnt--; + last_failed=rule_seq; + } + } + else + { + success_cnt=-1; + } + if(transaction_version>0) + { + transaction_finished_version=read_redis_integer(transaction_reply->element[multi_cmd_cnt-1]); + MESA_handle_runtime_log(logger, RLOG_LV_DEBUG, maat_command, + "Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld ", + transaction_version, + transaction_finished_version); + } + + freeReplyObject(transaction_reply); + transaction_reply=NULL; + +error_out: + if(renew_num>0&&renew_allowed!=1) + { + for(i=0;i<(unsigned int)serial_rule_num;i++) + { + if(s_rule[i].op==MAAT_OP_RENEW_TIMEOUT) + { + MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_command + ,"%s %s %d is not allowed due to lock contention.",mr_op_str[MAAT_OP_RENEW_TIMEOUT] + , s_rule[i].table_name,s_rule[i].rule_id); + } + } + if(success_cnt>0) + { + success_cnt-=renew_num; + } + } + free(expected_reply); + return success_cnt; +} + + +void check_maat_expiration(redisContext *ctx, void *logger) +{ + unsigned int i=0,s_rule_num=0; + UNUSED int ret=0; + int success_cnt=0; + redisReply* data_reply=NULL; + struct serial_rule_t* s_rule=NULL; + long long server_time=0; + + server_time=redis_server_time(ctx); + if(!server_time) + { + return; + } + data_reply=_wrap_redisCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",mr_expire_sset,server_time); + if(data_reply->type!=REDIS_REPLY_ARRAY||data_reply->elements==0) + { + freeReplyObject(data_reply); + data_reply=NULL; + return; + } + s_rule_num=data_reply->elements; + s_rule=(struct serial_rule_t*)calloc(sizeof(struct serial_rule_t),s_rule_num); + for(i=0;ielement[i]->str,"%[^,],%ld",s_rule[i].table_name,&(s_rule[i].rule_id)); + assert(ret==2); + } + freeReplyObject(data_reply); + data_reply=NULL; + success_cnt=exec_serial_rule(ctx,s_rule, s_rule_num,server_time, logger); + + if(success_cnt==(int)s_rule_num) + { + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor + ,"Succesfully expired %d rules in Redis.", s_rule_num); + } + else + { + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor + ,"Failed to expired %d of %d rules in Redis, try later.", s_rule_num-success_cnt,s_rule_num); + } + + free(s_rule); + return; +} +void cleanup_update_status(redisContext *ctx, void *logger) +{ + redisReply* reply=NULL,*sub_reply=NULL; + int append_cmd_cnt=0,i=0; + long long server_time=0, version_upper_bound=0,version_lower_bound=0,version_num=0,entry_num=0; + + server_time=redis_server_time(ctx); + if(!server_time) + { + return; + } + reply=_wrap_redisCommand(ctx,"MULTI"); + freeReplyObject(reply); + reply=NULL; + redisAppendCommand(ctx, "ZRANGEBYSCORE %s -inf %lld",mr_version_sset,server_time-MAAT_REDIS_SYNC_TIME); + append_cmd_cnt++; + redisAppendCommand(ctx, "ZREMRANGEBYSCORE %s -inf %lld",mr_version_sset,server_time-MAAT_REDIS_SYNC_TIME); + append_cmd_cnt++; + //consume reply "OK" and "QUEUED". + for(i=0;itype!=REDIS_REPLY_ARRAY) + { + goto error_out; + } + sub_reply=reply->element[0]; + if(sub_reply->type!=REDIS_REPLY_ARRAY) + { + goto error_out; + } + version_num=sub_reply->elements; + if(version_num==0) + { + goto error_out; + } + version_lower_bound=read_redis_integer(sub_reply->element[0]); + version_upper_bound=read_redis_integer(sub_reply->element[sub_reply->elements-1]); + freeReplyObject(reply); + reply=NULL; + + //To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally. + reply=_wrap_redisCommand(ctx,"ZREMRANGEBYSCORE %s %lld %lld",mr_status_sset,version_lower_bound,version_upper_bound); + entry_num=read_redis_integer(reply); + freeReplyObject(reply); + reply=NULL; + + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor + ,"Clean up update status from version %lld to %lld (%lld versions, %lld entries)." + ,version_lower_bound + ,version_upper_bound + ,version_num + ,entry_num); + return; + +error_out: + freeReplyObject(reply); + reply=NULL; + return; +} +const char* find_Nth_column(const char* line, int Nth, int* column_len) +{ + size_t i=0; + int j=0; + int start=0, end=0; + size_t line_len= strlen(line); + for(i=0;in_foreign; i++) + { + fn_size+=strlen(p->f_keys[i].filename); + } + + rewrite_line=(char*)calloc(sizeof(char), strlen(p->table_line)+fn_size); + pos_origin_line=p->table_line; + pos_rewrite_line=rewrite_line; + + for(i=0; in_foreign; i++) + { + origin_column=find_Nth_column(p->table_line, p->f_keys[i].column, &origin_column_size); + strncat(pos_rewrite_line, pos_origin_line, origin_column-pos_origin_line); + pos_rewrite_line+=origin_column-pos_origin_line; + pos_origin_line=origin_column+origin_column_size; + + strncat(pos_rewrite_line, p->f_keys[i].filename, strlen(p->f_keys[i].filename)); + pos_rewrite_line+=strlen(p->f_keys[i].filename); + } + strncat(pos_rewrite_line, pos_origin_line, strlen(p->table_line)-(pos_origin_line-p->table_line)); + + free(p->table_line); + p->table_line=rewrite_line; + return; +} +void _get_foregin_keys(struct serial_rule_t* p_rule, int* foreign_columns, int n_foreign, const char* dir, void* logger) +{ + int i=0; + const char* p_foreign=NULL; + int foreign_key_size=0; + p_rule->f_keys=ALLOC(struct foreign_key, n_foreign); + for(i=0; itable_line, foreign_columns[i], &foreign_key_size); + if(p_foreign==NULL) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Get %s,%d foreign keys failed: No %dth column.", + p_rule->table_name, p_rule->rule_id, foreign_columns[i]); + continue; + } + if(0==strncasecmp(p_foreign, "null", strlen("null"))) + {//emtpy file + continue; + } + if(0!=strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix))) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor, + "Get %s,%d foreign key failed: Invalid source prefix %s.", + p_rule->table_name, p_rule->rule_id, p_foreign); + continue; + } + p_rule->f_keys[p_rule->n_foreign].column=foreign_columns[i]; + foreign_key_size=foreign_key_size-strlen(foreign_source_prefix); + p_foreign+=strlen(foreign_source_prefix); + if(0!=strncmp(p_foreign, foreign_key_prefix, strlen(foreign_key_prefix))) + { + MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, + "%s, %d foreign key prefix %s is not recommended.", + p_rule->table_name, p_rule->rule_id, p_foreign); + } + p_rule->f_keys[p_rule->n_foreign].key=ALLOC(char, foreign_key_size+1); + memcpy(p_rule->f_keys[p_rule->n_foreign].key, p_foreign, foreign_key_size); + p_rule->f_keys[p_rule->n_foreign].filename=get_foreign_cont_filename(p_rule->table_name, p_rule->rule_id, p_rule->f_keys[p_rule->n_foreign].key, dir); + p_rule->n_foreign++; + } + if(p_rule->n_foreign==0) + { + free(p_rule->f_keys); + p_rule->f_keys=NULL; + } + return; +} +int get_foreign_keys_define(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, _Maat_feather_t* feather, const char* dir,void *logger) +{ + int i=0; + int rule_with_foreign_key=0; + struct Maat_table_schema* p_table=NULL; + struct plugin_table_schema* plugin_desc=NULL; + for(i=0; itable_mgr, rule_list[i].table_name); + if(!p_table||p_table->table_type!=TABLE_TYPE_PLUGIN) + { + continue; + } + plugin_desc= &(p_table->plugin); + if(plugin_desc->n_foreign==0) + { + continue; + } + _get_foregin_keys(rule_list+i, plugin_desc->foreign_columns, plugin_desc->n_foreign, dir, logger); + rule_with_foreign_key++; + } + return rule_with_foreign_key; +} +int get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, const char* dir,void *logger) +{ + int i=0, j=0, foreign_key_size=0; + int rule_with_foreign_key=0; + const char* p_foreign=NULL; + + int n_foreign=0; + int foreign_columns[MAX_FOREIGN_CLMN_NUM]; + for(i=0; i(int)strlen(foreign_source_prefix)&&0==strncmp(p_foreign,foreign_source_prefix, strlen(foreign_source_prefix))) + { + foreign_columns[n_foreign]=j; + n_foreign++; + } + j++; + }while(p_foreign!=NULL&&n_foreign0) + { + _get_foregin_keys(rule_list+i, foreign_columns, n_foreign,dir,logger); + rule_with_foreign_key++; + } + } + return rule_with_foreign_key; +} + +struct foreign_conts_track +{ + int rule_idx; + int foreign_idx; +}; +void _get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, int print_fn, void *logger) +{ + int i=0, j=0; + UNUSED int ret=0; + int key_num=0; + struct foreign_conts_track* track=ALLOC(struct foreign_conts_track, rule_num*MAX_FOREIGN_CLMN_NUM); + char redis_cmd[256]; + redisReply* reply=NULL; + struct serial_rule_t*p=NULL; + FILE* fp=NULL; + struct stat file_info; + + for(i=0;in_foreign==0) + { + continue; + } + if(p->op==MAAT_OP_DEL) + { + for(j=0; jn_foreign; j++) + { + if(rule_list[i].f_keys[j].filename==NULL) + { + continue; + } + ret=stat(p->f_keys[j].filename, &file_info); + if(ret==0) + { + continue; + } + snprintf(redis_cmd,sizeof(redis_cmd),"GET %s", p->f_keys[j].key); + ret=redisAppendCommand(ctx, redis_cmd); + track[key_num].rule_idx=i; + track[key_num].foreign_idx=j; + key_num++; + assert(ret==REDIS_OK); + } + } + } + for(i=0;itype!=REDIS_REPLY_STRING) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor + ,"Get %s,%d foreign key %s content failed." + ,rule_list[track[i].rule_idx].table_name + ,rule_list[track[i].rule_idx].rule_id + ,rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key); + continue; + } + else + { + p=rule_list+track[i].rule_idx; + fp=fopen(p->f_keys[track[i].foreign_idx].filename, "w"); + if(fp==NULL) + { + MESA_handle_runtime_log(logger, RLOG_LV_FATAL, maat_redis_monitor + , "Write foreign content failed: fopen %s error." + , p->f_keys[track[i].foreign_idx]); + } + else + { + fwrite(reply->str, 1, reply->len, fp); + fclose(fp); + fp=NULL; + if(print_fn==1) + { + printf("Written foreign content %s\n",p->f_keys[track[i].foreign_idx].filename); + } + } + } + freeReplyObject(reply); + reply=NULL; + } + + free(track); + return; +} +void get_foreign_conts(redisContext *ctx, struct serial_rule_t* rule_list, int rule_num, int print_fn, void *logger) +{ + int max_redis_batch=4*1024,batch_cnt=0; + int success_cnt=0; + while(success_cntlogger; + + if(mr_ctx->write_ctx!=NULL&&mr_ctx->write_ctx->err==0)//authorized to write + { + //For thread safe, deliberately use redis_read_ctx but not redis_write_ctx. + if(1==redlock_try_lock(mr_ctx->read_ctx, mr_expire_lock, mr_expire_lock_timeout)) + { + check_maat_expiration(mr_ctx->read_ctx, logger); + cleanup_update_status(mr_ctx->read_ctx, logger); + redlock_unlock(mr_ctx->read_ctx, mr_expire_lock); + } + } + if(mr_ctx->read_ctx==NULL||mr_ctx->read_ctx->err) + { + if(time(NULL)-mr_ctx->last_reconnect_time < MAAT_REDIS_RECONNECT_INTERVAL) + { + return; + } + mr_ctx->last_reconnect_time=time(NULL); + if(mr_ctx->read_ctx!=NULL) + { + redisFree(mr_ctx->read_ctx); + } + MESA_handle_runtime_log(logger, RLOG_LV_INFO, maat_redis_monitor, "Reconnecting..."); + mr_ctx->read_ctx=connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db, feather->logger); + if(mr_ctx->read_ctx==NULL) + { + return; + } + else + { + version=0;//Trigger full update when reconnect to redis. + } + } + + rule_num=get_rm_key_list(mr_ctx->read_ctx, version, feather->load_version_from, &new_version, feather->table_mgr, &rule_list, &update_type, logger, feather->cumulative_update_off); + if(rule_num<0)//redis communication error + { + return; + } + feather->load_version_from=0;//only valid for one time. + if(rule_num==0&&update_type==CM_UPDATE_TYPE_INC)//error or nothing changed + { + return; + } + if(rule_num>0) + { + ret=get_maat_redis_value(mr_ctx->read_ctx, rule_list, rule_num, logger, 0); + if(ret<0) + { + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Get Redis value failed, abandon update."); + goto clean_up; + } + for(i=0;i0) + { + + MESA_handle_runtime_log(logger,RLOG_LV_FATAL,maat_redis_monitor,"%d of %d rules are empty.",empty_value_num,rule_num); + } + ret=get_foreign_keys_define(mr_ctx->read_ctx, rule_list, rule_num, feather, feather->foreign_cont_dir, logger); + if(ret>0) + { + get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0, logger); + } + } + start(new_version,update_type,u_para); + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Start %s update: %lld -> %lld (%d entries).", + update_type==CM_UPDATE_TYPE_INC?"INC":"FULL",version,new_version,rule_num); + for(i=0;itable_mgr, rule_list[i].table_name); + if(table_id<0)//Unrecognized table. + { + continue; + } + table_type=Maat_table_get_type_by_id(feather->table_mgr, table_id); + if(rule_list[i].op==MAAT_OP_DEL) + { + + scan_type=Maat_table_get_scan_type(table_type); + table_schema=Maat_table_get_scan_by_id(feather->table_mgr, table_id, scan_type, NULL); + valid_column=Maat_table_xx_plugin_table_get_valid_flag_column(table_schema); + ret=invalidate_line(rule_list[i].table_line, table_type, valid_column); + if(ret<0) + { + MESA_handle_runtime_log(logger,RLOG_LV_INFO,maat_redis_monitor,"Invalidate line failed, invaid format %s ." + ,rule_list[i].table_line); + continue; + } + } + if(rule_list[i].n_foreign>0) + { + rewrite_table_line_with_foreign(rule_list+i); + } + update(rule_list[i].table_name,rule_list[i].table_line,u_para); + } + finish(u_para); + +clean_up: + for(i=0;itable_name!=NULL) + { + dst->table_name=_maat_strdup(src->table_name); + } + switch(dst->region_type) + { + case REGION_IP: + dst->ip_rule.src_ip=_maat_strdup(src->ip_rule.src_ip); + dst->ip_rule.mask_src_ip=_maat_strdup(src->ip_rule.mask_src_ip); + dst->ip_rule.dst_ip=_maat_strdup(src->ip_rule.dst_ip); + dst->ip_rule.mask_dst_ip=_maat_strdup(src->ip_rule.mask_dst_ip); + break; + case REGION_EXPR: + dst->expr_rule.keywords=_maat_strdup(src->expr_rule.keywords); + dst->expr_rule.district=_maat_strdup(src->expr_rule.district); + break; + case REGION_INTERVAL: + break; + case REGION_DIGEST: + dst->digest_rule.digest_string=_maat_strdup(src->digest_rule.digest_string); + break; + case REGION_SIMILARITY: + dst->similarity_rule.target=_maat_strdup(src->similarity_rule.target); + break; + default: + assert(0); + } + return; +} +void _maat_empty_region(struct Maat_region_t* p) +{ + free((char*)p->table_name); + p->table_name=NULL; + switch(p->region_type) + { + case REGION_IP: + free((char*)p->ip_rule.src_ip); + free((char*)p->ip_rule.mask_src_ip); + free((char*)p->ip_rule.dst_ip); + free((char*)p->ip_rule.mask_dst_ip); + break; + case REGION_EXPR: + free((char*)p->expr_rule.keywords); + free((char*)p->expr_rule.district); + break; + case REGION_INTERVAL: + break; + case REGION_DIGEST: + free((char*)p->digest_rule.digest_string); + break; + case REGION_SIMILARITY: + free((char*)p->similarity_rule.target); + break; + default: + assert(0); + } + memset(p,0,sizeof(struct Maat_region_t)); + return; + +} +int Maat_command_raw_set_lines(Maat_feather_t feather,const struct Maat_cmd_line** line_rule, size_t n_line ,enum MAAT_OPERATION op) +{ + size_t i=0; + _Maat_feather_t* _feather=(_Maat_feather_t*)feather; + int ret=0, table_id=0,success_cnt=0; + struct serial_rule_t *s_rule=NULL; + long long server_time=0,absolute_expire_time=0; + redisContext* write_ctx=get_redis_ctx_for_write(_feather); + if(write_ctx==NULL) + { + return -1; + } + server_time=redis_server_time(write_ctx); + if(!server_time) + { + return -1; + } + s_rule=ALLOC(struct serial_rule_t, n_line); + for(i=0;itable_mgr, line_rule[i]->table_name); + if(table_id<0) + { + MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command, + "Command raw set line id %d failed: unknown table %s.", + line_rule[i]->rule_id, + line_rule[i]->table_name); + ret=-1; + goto error_out; + } + if(op==MAAT_OP_RENEW_TIMEOUT) + { + assert(line_rule[i]->expire_after>0); + } + if(line_rule[i]->expire_after>0) + { + absolute_expire_time=server_time+line_rule[i]->expire_after; + } + set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id, line_rule[i]->table_name, + line_rule[i]->table_line, absolute_expire_time); + } + success_cnt=exec_serial_rule(write_ctx,s_rule, n_line,server_time,_feather->logger); + if(success_cnt<0||(size_t)success_cnt!=n_line)//error + { + ret=-1; + goto error_out; + } + ret=success_cnt; + _feather->line_cmd_acc_num+=success_cnt; + +error_out: + for(i=0;itable_mgr, line_rule[i]->table_name); + if(table_id<0) + { + MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command + ,"Command set line id %d failed: unknown table %s." + , line_rule[i]->rule_id + , line_rule[i]->table_name); + ret=-1; + goto error_out; + } + p_table=Maat_table_get_by_id_raw(_feather->table_mgr, table_id); + if(!p_table) + { + ret=-1; + goto error_out; + } + int valid_flag_column=0; + + valid_flag_column=Maat_table_xx_plugin_table_get_valid_flag_column(p_table); + if(valid_flag_column<0) + { + MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command + ,"Command set line id %d failed: table %s is not a plugin or ip_plugin table." + , line_rule[i]->rule_id + , line_rule[i]->table_name); + ret=-1; + goto error_out; + + } + + if(op==MAAT_OP_ADD) + { + ret=get_valid_flag_offset(line_rule[i]->table_line + , p_table->table_type + , valid_flag_column); + if(ret<0|| + (op==MAAT_OP_ADD&&line_rule[i]->table_line[ret]!='1')) + { + MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_command + ,"Command set line %s %d failed: illegal valid flag." + , line_rule[i]->table_name, line_rule[i]->rule_id); + ret=-1; + goto error_out; + } + } + if(op==MAAT_OP_RENEW_TIMEOUT) + { + assert(line_rule[i]->expire_after>0); + } + if(line_rule[i]->expire_after>0) + { + absolute_expire_time=server_time+line_rule[i]->expire_after; + } + if(plugin_desc && plugin_desc->n_foreign>0) + { + for(j=0;jn_foreign;j++) + { + p_foreign=find_Nth_column(line_rule[i]->table_line, plugin_desc->foreign_columns[j], &foreign_key_size); + if(p_foreign==NULL) + { + MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL, maat_command + , "Command set line %s %d failed: No %dth column." + , line_rule[i]->table_name, line_rule[i]->rule_id + , plugin_desc->foreign_columns[j]); + ret=-1; + goto error_out; + } + if(0!=strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix))) + { + MESA_handle_runtime_log(_feather->logger,RLOG_LV_FATAL,maat_redis_monitor + ,"Command set line %s %d failed: Source prefix %s is mandatory." + , line_rule[i]->table_name, line_rule[i]->rule_id, foreign_source_prefix); + ret=-1; + goto error_out; + } + } + + } + set_serial_rule(s_rule+i, op,line_rule[i]->rule_id,line_rule[i]->label_id, line_rule[i]->table_name, + line_rule[i]->table_line, absolute_expire_time); + } + success_cnt=exec_serial_rule(write_ctx,s_rule, line_num,server_time,_feather->logger); + if(success_cnt<0||success_cnt!=line_num)//error + { + ret=-1; + goto error_out; + } + ret=success_cnt; + _feather->line_cmd_acc_num+=success_cnt; + +error_out: + for(i=0;imr_ctx.write_ctx; + if(ctx==NULL) + { + MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command, "%s failed: Redis is not connected.", __FUNCTION__); + return -1; + } + const char *arg_vec[3]; + size_t len_vec[3]; + arg_vec[0] = "SET"; + len_vec[0] = strlen("SET"); + + arg_vec[1] = key; + len_vec[1] = strlen(key); + + arg_vec[2] = value; + len_vec[2] = size; + + redisReply *reply=NULL; + if(0!=strncmp(key, foreign_key_prefix, strlen(foreign_key_prefix))) + { + MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command, "Invalid File key, prefix %s is mandatory.", foreign_key_prefix); + return -1; + } + switch(op) + { + case MAAT_OP_ADD: + reply= (redisReply *)redisCommandArgv(ctx, sizeof(arg_vec) / sizeof(arg_vec[0]), arg_vec, len_vec); + break; + case MAAT_OP_DEL: + reply=_wrap_redisCommand(ctx,"EXPIRE %s %d", key, MAAT_REDIS_SYNC_TIME); + break; + default: + return -1; + break; + } + if(reply==NULL||reply->type==REDIS_REPLY_NIL||reply->type==REDIS_REPLY_ERROR) + { + MESA_handle_runtime_log(_feather->logger, RLOG_LV_FATAL, maat_command,"Set file failed, maybe Redis is busy."); + freeReplyObject(reply); + reply=NULL; + return -1; + } + freeReplyObject(reply); + reply=NULL; + return 1; +} + +long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment) +{ + _Maat_feather_t* _feather=(_Maat_feather_t*)feather; + redisReply* data_reply=NULL; + long long result=0; + redisContext* write_ctx=get_redis_ctx_for_write(_feather); + + if(write_ctx==NULL) + { + return -1; + } + data_reply=_wrap_redisCommand(write_ctx,"INCRBY %s %d", key, increment); + if(data_reply->type==REDIS_REPLY_INTEGER) + { + result=data_reply->integer; + } + else + { + result=-1; + } + freeReplyObject(data_reply); + data_reply=NULL; + return result; +} +int Maat_command_get_new_group_id(Maat_feather_t feather) +{ + int group_id=0; + group_id=(int) Maat_cmd_incrby(feather, mr_group_id_var, 1); + return group_id; +} +int Maat_command_get_new_region_id(Maat_feather_t feather) +{ + int region_id=0; + region_id=(int) Maat_cmd_incrby(feather, mr_region_id_var, 1); + return region_id; +} + +void Maat_cmd_key_free(struct Maat_cmd_key**keys, int size) +{ + int i=0; + struct Maat_cmd_key* p=*keys; + for(i=0; itable_name); + p->table_name=NULL; + p->rule_id=0; + } + free(*keys); + *keys=NULL; + return; +} + +int Maat_cmd_key_select(Maat_feather_t feather, int label_id, struct Maat_cmd_key** keys) +{ + _Maat_feather_t* _feather=(_Maat_feather_t*)feather; + redisReply* data_reply=NULL; + char* tmp=NULL; + unsigned int i=0; + struct Maat_cmd_key* result=NULL; + int result_cnt=0; + redisContext* write_ctx=get_redis_ctx_for_write(_feather); + if(write_ctx==NULL) + { + return -1; + } + + data_reply=_wrap_redisCommand(write_ctx,"ZRANGEBYSCORE %s %d %d", + mr_label_sset, + label_id, + label_id); + result_cnt=data_reply->elements; + result=ALLOC(struct Maat_cmd_key, data_reply->elements); + for(i=0;ielements;i++) + { + result[i].table_name=_maat_strdup(data_reply->element[i]->str); + tmp=strchr(result[i].table_name, ','); + if(tmp!=NULL) + { + *tmp='\0'; + tmp++; + result[i].rule_id=atoi(tmp); + } + else// old version compatible + { + result[i].rule_id=atoi(result[i].table_name); + free(result[i].table_name); + result[i].table_name=NULL; + } + } + freeReplyObject(data_reply); + data_reply=NULL; + + *keys=result; + return result_cnt; +} + +int redis_flush_DB(redisContext* ctx, int db_index, void* logger) +{ + redisReply* data_reply=NULL; + long long maat_redis_version=0, dbsize=0; + int append_cmd_cnt=0, i=0,ret=0; + int redis_transaction_success=1; + + data_reply=_wrap_redisCommand(ctx, "WATCH MAAT_VERSION"); + freeReplyObject(data_reply); + data_reply=NULL; + data_reply=_wrap_redisCommand(ctx, "GET MAAT_VERSION"); + if(data_reply->type==REDIS_REPLY_NIL) + { + maat_redis_version=0; + } + else + { + maat_redis_version=read_redis_integer(data_reply); + maat_redis_version++; + freeReplyObject(data_reply); + data_reply=NULL; + } + data_reply=_wrap_redisCommand(ctx, "DBSIZE"); + dbsize=read_redis_integer(data_reply); + freeReplyObject(data_reply); + data_reply=NULL; + + data_reply=_wrap_redisCommand(ctx,"MULTI"); + freeReplyObject(data_reply); + data_reply=NULL; + + redisAppendCommand(ctx,"FLUSHDB"); + append_cmd_cnt++; + redisAppendCommand(ctx,"SET MAAT_VERSION %lld",maat_redis_version); + append_cmd_cnt++; + redisAppendCommand(ctx,"SET MAAT_PRE_VER %lld",maat_redis_version); + append_cmd_cnt++; + redisAppendCommand(ctx,"SET %s 1", mr_region_id_var); + append_cmd_cnt++; + redisAppendCommand(ctx,"SET %s 1", mr_group_id_var); + append_cmd_cnt++; + redisAppendCommand(ctx,"EXEC"); + append_cmd_cnt++; + for(i=0;iqueue); + batch->feather=(struct _Maat_feather_t *)feather; + redisContext* write_ctx=get_redis_ctx_for_write(batch->feather); + if(write_ctx==NULL) + { + free(batch); + return NULL; + } + batch->server_time=redis_server_time(write_ctx); + if(!batch->server_time) + { + free(batch); + return NULL; + } + return batch; +} + +int Maat_command_batch_set_region(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id) +{ + struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); + long long absolute_expire_time=0; + char line[MAX_TABLE_LINE_SIZE]; + + serialize_region(region, group_id, line, sizeof(line)); + + set_serial_rule(s_rule, op, region->region_id, 0, region->table_name, + line, absolute_expire_time); + TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); + batch->batch_size++; + return 0; + +} +#define TO_GROUP2X_KEY(group_id, parent_id) ((unsigned long)group_id<<32|parent_id) + +int Maat_command_batch_set_group2group(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g) +{ + struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); + long long absolute_expire_time=0; + char line[MAX_TABLE_LINE_SIZE]; + + serialize_group2group(op, g2g, line, sizeof(line)); + + set_serial_rule(s_rule, op, TO_GROUP2X_KEY(g2g->group_id, g2g->superior_group_id), 0, g2g->table_name, + line, absolute_expire_time); + + TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); + batch->batch_size++; + return 0; +} +int Maat_command_batch_set_group2compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c) +{ + struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); + long long absolute_expire_time=0; + char line[MAX_TABLE_LINE_SIZE]; + + serialize_group2compile(op, g2c, line, sizeof(line)); + set_serial_rule(s_rule, op, TO_GROUP2X_KEY(g2c->group_id, g2c->compile_id), 0, g2c->table_name, + line, absolute_expire_time); + TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); + batch->batch_size++; + return 0; +} +int Maat_command_batch_set_compile(struct Maat_command_batch* batch, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after) +{ + + struct serial_rule_t* s_rule=ALLOC(struct serial_rule_t, 1); + long long absolute_expire_time=0; + char line[MAX_TABLE_LINE_SIZE]; + serialize_compile(compile, huge_service_defined, clause_num, op, line, sizeof(line)); + + if(expire_after>0) + { + absolute_expire_time=batch->server_time+expire_after; + } + set_serial_rule(s_rule, op, compile->config_id, label_id, table_name, + line, absolute_expire_time); + + TAILQ_INSERT_TAIL(&batch->queue, s_rule, entries); + batch->batch_size++; + return 0; +} +int Maat_command_batch_commit(struct Maat_command_batch* batch) +{ + struct serial_rule_t* s_rule_array=ALLOC(struct serial_rule_t, batch->batch_size); + int i=0; + redisContext* write_ctx=get_redis_ctx_for_write(batch->feather); + struct serial_rule_t * tmp = TAILQ_FIRST(&batch->queue); + + while(tmp != NULL) + { + TAILQ_REMOVE(&batch->queue, tmp, entries); + memcpy(s_rule_array+i, tmp, sizeof(*tmp)); + free(tmp); + tmp = TAILQ_FIRST(&batch->queue); + i++; + } + assert(i==batch->batch_size); + exec_serial_rule(write_ctx, s_rule_array, batch->batch_size, batch->server_time, batch->feather->logger); + for(i=0; ibatch_size; i++) + { + empty_serial_rules(s_rule_array+i); + } + free(s_rule_array); + free(batch); + return i; +} + +int Maat_command_raw_set_compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_rule_t* compile, const char* table_name, const char * huge_service_defined, int clause_num, int label_id, int expire_after) +{ + struct Maat_command_batch* batch=NULL; + batch=Maat_command_batch_new(feather); + Maat_command_batch_set_compile(batch, op, compile, table_name, huge_service_defined, clause_num, label_id, expire_after); + Maat_command_batch_commit(batch); + return 0; +} +int Maat_command_raw_set_region(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_region* region, int group_id) +{ + struct Maat_command_batch* batch=NULL; + batch=Maat_command_batch_new(feather); + Maat_command_batch_set_region(batch, op, region, group_id); + Maat_command_batch_commit(batch); + return 0; +} +int Maat_command_raw_set_group2compile(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2compile* g2c) +{ + struct Maat_command_batch* batch=NULL; + batch=Maat_command_batch_new(feather); + Maat_command_batch_set_group2compile(batch, op, g2c); + Maat_command_batch_commit(batch); + return 0; +} +int Maat_command_raw_set_group2group(Maat_feather_t feather, enum MAAT_OPERATION op, const struct Maat_cmd_group2group* g2g) +{ + + struct Maat_command_batch* batch=NULL; + batch=Maat_command_batch_new(feather); + Maat_command_batch_set_group2group(batch, op, g2g); + Maat_command_batch_commit(batch); + return 0; +} + +int Maat_cmd_flushDB(Maat_feather_t feather) +{ + _Maat_feather_t* _feather=(_Maat_feather_t*)feather; + int ret=0; + redisContext* write_ctx=get_redis_ctx_for_write(_feather); + if(write_ctx==NULL) + { + return -1; + } + do + { + ret=redis_flush_DB(_feather->mr_ctx.write_ctx, _feather->mr_ctx.redis_db, _feather->logger); + }while(ret==0); + return 0; +} + diff --git a/src/entry/Maat_rule.cpp b/src/entry/Maat_rule.cpp index 29573c6..05e67d9 100644 --- a/src/entry/Maat_rule.cpp +++ b/src/entry/Maat_rule.cpp @@ -57,7 +57,7 @@ extern "C" } #endif -int MAAT_FRAME_VERSION_3_6_4_20220423=1; +int MAAT_FRAME_VERSION_3_6_5_20220426=1; int is_valid_table_name(const char* str) { diff --git a/src/entry/Maat_table_runtime.cpp b/src/entry/Maat_table_runtime.cpp index a87148b..657a886 100644 --- a/src/entry/Maat_table_runtime.cpp +++ b/src/entry/Maat_table_runtime.cpp @@ -58,6 +58,11 @@ struct ip_rule* ip_plugin_row2ip_rule(const struct ip_plugin_table_schema* sche range_rule->user_tag=NULL; return range_rule; } +void ip_rule_free(struct ip_rule* p) +{ + free(p); + return; +} struct Maat_table_runtime_manager { struct Maat_table_runtime** table_rt; @@ -452,7 +457,12 @@ void Maat_table_runtime_fqdn_plugin_new_row(struct Maat_table_runtime* table_rt, if(atoi(row+is_valid_offset)==1)//add { fqdn_rule=fqdn_rule_new((unsigned int)atoi(row+row_id_offset), row+fqdn_offset, fqdn_len, atoi(row+is_suffix_flag_offset)); - EX_data_rt_row2EX_data(fqdn_plugin_rt->ex_data_rt, row, row+row_id_offset, row_id_len, fqdn_rule, logger); + ret=EX_data_rt_row2EX_data(fqdn_plugin_rt->ex_data_rt, row, row+row_id_offset, row_id_len, fqdn_rule, logger); + if(ret<0) + { + fqdn_rule_free(fqdn_rule); + fqdn_rule=NULL; + } } else { @@ -690,13 +700,19 @@ void Maat_table_runtime_ip_plugin_new_row(struct Maat_table_runtime* table_rt, s } if(atoi(row+is_valid_offset)==1)//add { - EX_data_rt_row2EX_data(ip_plugin_rt->ex_data_rt, row, row+row_id_offset, row_id_len, ip_rule, logger); + ret=EX_data_rt_row2EX_data(ip_plugin_rt->ex_data_rt, row, row+row_id_offset, row_id_len, ip_rule, logger); + if(ret<0) + { + ip_rule_free(ip_rule); + ip_rule=NULL; + } } else { - EX_data_rt_delete_by_row(ip_plugin_rt->ex_data_rt, row, row+row_id_offset, row_id_len, logger); - free(ip_rule); + ret=EX_data_rt_delete_by_row(ip_plugin_rt->ex_data_rt, row, row+row_id_offset, row_id_len, logger); + ip_rule_free(ip_rule); + ip_rule=NULL; } table_rt->origin_rule_num=EX_data_rt_get_ex_container_count(ip_plugin_rt->ex_data_rt); }