diff --git a/src/inc_internal/maat_redis_monitor.h b/src/inc_internal/maat_redis_monitor.h index 144024d..d1d0b99 100644 --- a/src/inc_internal/maat_redis_monitor.h +++ b/src/inc_internal/maat_redis_monitor.h @@ -81,7 +81,7 @@ void maat_rewrite_table_line_with_foreign(struct serial_rule *s_rule); int maat_get_rm_key_list(redisContext *c, long long instance_version, long long *new_version, struct table_manager *tbl_mgr, struct serial_rule **list, int *update_type, - int cumulative_off, struct log_handle *logger); + struct log_handle *logger); void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx, void (*start_fn)(long long, int, void *), diff --git a/src/inc_internal/maat_rule.h b/src/inc_internal/maat_rule.h index 71ce688..6f6e751 100644 --- a/src/inc_internal/maat_rule.h +++ b/src/inc_internal/maat_rule.h @@ -124,7 +124,6 @@ struct maat_options { int deferred_load_on; int maat_json_is_gzipped; - int cumulative_update_off; //Default: cumulative update on int gc_timeout_ms; int rule_effect_interval_ms; diff --git a/src/maat_redis_monitor.c b/src/maat_redis_monitor.c index 590e096..77cb4d3 100644 --- a/src/maat_redis_monitor.c +++ b/src/maat_redis_monitor.c @@ -50,8 +50,9 @@ struct expected_reply { redisReply possible_replies[REDIS_REPLY_SIZE]; }; -static char *get_foreign_cont_filename(const char *table_name, long long rule_id, - const char *foreign_key, const char *dir) +static char * +get_foreign_cont_filename(const char *table_name, long long rule_id, + const char *foreign_key, const char *dir) { char buffer[512] = {0}; @@ -63,7 +64,8 @@ static char *get_foreign_cont_filename(const char *table_name, long long rule_id return filename; } -static const char *maat_cmd_find_Nth_column(const char *line, int Nth, int *column_len) +static const char * +maat_cmd_find_Nth_column(const char *line, int Nth, int *column_len) { size_t i = 0; int j = 0; @@ -99,16 +101,17 @@ static const char *maat_cmd_find_Nth_column(const char *line, int Nth, int *colu return line + start; } -static void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, - int n_foreign, const char *dir, struct log_handle *logger) +static void +get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, + int n_foreign, const char *dir, struct log_handle *logger) { int foreign_key_size = 0; p_rule->f_keys = ALLOC(struct foreign_key, n_foreign); for (int i = 0; i < n_foreign; i++) { - const char *p_foreign = maat_cmd_find_Nth_column(p_rule->table_line, - foreign_columns[i], - &foreign_key_size); + const char *p_foreign = + maat_cmd_find_Nth_column(p_rule->table_line, foreign_columns[i], + &foreign_key_size); if (NULL == p_foreign) { log_fatal(logger, MODULE_REDIS_MONITOR, "[%s:%d] Get %s,%lld foreign keys failed: No %dth column", @@ -122,10 +125,12 @@ static void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, continue; } - if (0 != strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix))) { + if (0 != strncmp(p_foreign, foreign_source_prefix, + strlen(foreign_source_prefix))) { log_fatal(logger, MODULE_REDIS_MONITOR, - "[%s:%d] Get %s,%lld foreign key failed: Invalid source prefix %s", - __FUNCTION__, __LINE__, p_rule->table_name, p_rule->rule_id, p_foreign); + "[%s:%d] Get %s,%lld foreign key failed: " + "Invalid source prefix %s", __FUNCTION__, __LINE__, + p_rule->table_name, p_rule->rule_id, p_foreign); continue; } @@ -133,18 +138,19 @@ static void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, foreign_key_size = foreign_key_size - strlen(foreign_source_prefix); p_foreign += strlen(foreign_source_prefix); - if (0 != strncmp(p_foreign, foreign_key_prefix, strlen(foreign_key_prefix))) { + if (0 != strncmp(p_foreign, foreign_key_prefix, + strlen(foreign_key_prefix))) { log_info(logger, MODULE_REDIS_MONITOR, "[%s:%d] %s, %lld foreign key prefix %s is not recommended", - __FUNCTION__, __LINE__, p_rule->table_name, p_rule->rule_id, p_foreign); + __FUNCTION__, __LINE__, p_rule->table_name, p_rule->rule_id, + p_foreign); } p_rule->f_keys[p_rule->n_foreign].key = ALLOC(char, foreign_key_size + 1); memcpy(p_rule->f_keys[p_rule->n_foreign].key, p_foreign, foreign_key_size); - p_rule->f_keys[p_rule->n_foreign].filename = get_foreign_cont_filename(p_rule->table_name, - p_rule->rule_id, - p_rule->f_keys[p_rule->n_foreign].key, - dir); + p_rule->f_keys[p_rule->n_foreign].filename = + get_foreign_cont_filename(p_rule->table_name, p_rule->rule_id, + p_rule->f_keys[p_rule->n_foreign].key, dir); p_rule->n_foreign++; } @@ -153,8 +159,9 @@ static void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, } } -static int get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_list, - int rule_num, struct maat *maat_inst, const char *dir) +static int +get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_list, + int rule_num, struct maat *maat_inst, const char *dir) { int rule_with_foreign_key = 0; @@ -163,29 +170,37 @@ static int get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_l continue; } - int table_id = table_manager_get_table_id(maat_inst->tbl_mgr, rule_list[i].table_name); + int table_id = table_manager_get_table_id(maat_inst->tbl_mgr, + rule_list[i].table_name); void *schema = table_manager_get_schema(maat_inst->tbl_mgr, table_id); - enum table_type table_type = table_manager_get_table_type(maat_inst->tbl_mgr, table_id); + + enum table_type table_type = + table_manager_get_table_type(maat_inst->tbl_mgr, table_id); + if (!schema || table_type != TABLE_TYPE_PLUGIN) { continue; } int foreign_columns[8]; - int n_foreign_column = plugin_table_get_foreign_column((struct plugin_schema *)schema, - foreign_columns); + int n_foreign_column = + plugin_table_get_foreign_column((struct plugin_schema *)schema, + foreign_columns); if (0 == n_foreign_column) { continue; } - _get_foregin_keys(rule_list+i, foreign_columns, n_foreign_column, dir, maat_inst->logger); + get_foregin_keys(rule_list+i, foreign_columns, n_foreign_column, + dir, maat_inst->logger); rule_with_foreign_key++; } return rule_with_foreign_key; } -int maat_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *rule_list, - int rule_num, const char* dir, struct log_handle *logger) +int maat_get_foreign_keys_by_prefix(redisContext *ctx, + struct serial_rule *rule_list, + int rule_num, const char* dir, + struct log_handle *logger) { int j = 0; int foreign_key_size = 0; @@ -199,9 +214,13 @@ int maat_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *rule_ j = 1; n_foreign = 0; do { - p_foreign = maat_cmd_find_Nth_column(rule_list[i].table_line, j, &foreign_key_size); - if (p_foreign != NULL && foreign_key_size > (int)strlen(foreign_source_prefix) && - 0 == strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix))) { + p_foreign = maat_cmd_find_Nth_column(rule_list[i].table_line, j, + &foreign_key_size); + + if (p_foreign != NULL && + foreign_key_size > (int)strlen(foreign_source_prefix) && + 0 == strncmp(p_foreign, foreign_source_prefix, + strlen(foreign_source_prefix))) { foreign_columns[n_foreign] = j; n_foreign++; } @@ -209,7 +228,8 @@ int maat_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *rule_ } while (p_foreign != NULL && n_foreign < MAX_FOREIGN_CLMN_NUM); if (n_foreign > 0) { - _get_foregin_keys(rule_list+i, foreign_columns, n_foreign, dir, logger); + get_foregin_keys(rule_list+i, foreign_columns, n_foreign, + dir, logger); rule_with_foreign_key++; } } @@ -222,8 +242,9 @@ struct foreign_conts_track { int foreign_idx; }; -static int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, - int rule_num, struct log_handle *logger) +static int +get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, + int rule_num, struct log_handle *logger) { int i = 0; int failed_cnt = 0; @@ -321,8 +342,10 @@ static int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, return 0; } -int maat_get_redis_value(redisContext *c, struct serial_rule *rule_list, - int rule_num, int print_process, struct log_handle *logger) +int maat_get_redis_value(redisContext *c, + struct serial_rule *rule_list, + int rule_num, int print_process, + struct log_handle *logger) { int max_redis_batch = 4096; int success_cnt = 0; @@ -330,7 +353,7 @@ int maat_get_redis_value(redisContext *c, struct serial_rule *rule_list, while (success_cnt < rule_num) { int batch_cnt = MIN(rule_num-success_cnt, max_redis_batch); - int ret = _get_maat_redis_value(c, rule_list+success_cnt, batch_cnt, logger); + int ret = get_maat_redis_value(c, rule_list+success_cnt, batch_cnt, logger); if (ret < 0) { return -1; } else { @@ -352,15 +375,17 @@ int maat_get_redis_value(redisContext *c, struct serial_rule *rule_list, return 0; } -static int get_inc_key_list(long long instance_version, long long target_version, - redisContext *c, struct serial_rule **list, - struct log_handle *logger) +static int +get_inc_key_list(long long instance_version, long long target_version, + redisContext *c, struct serial_rule **list, + struct log_handle *logger) { //Returns all the elements in the sorted set at key with a score that instance_version < score <= redis_version. //The elements are considered to be ordered from low to high scores(instance_version). - redisReply *reply = (redisReply *)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld", - mr_status_sset, instance_version, - target_version); + redisReply *reply = + (redisReply *)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld", + mr_status_sset, instance_version, + target_version); if (NULL == reply) { log_fatal(logger, MODULE_REDIS_MONITOR, "[%s:%d] GET %s failed with a NULL reply, error: %s", @@ -480,15 +505,15 @@ void maat_set_serial_rule(struct serial_rule *rule, enum maat_operation op, int maat_get_rm_key_list(redisContext *c, long long instance_version, long long *new_version, struct table_manager *tbl_mgr, struct serial_rule **list, int *update_type, - int cumulative_off, struct log_handle *logger) + struct log_handle *logger) { int rule_num = 0; - long long target_version = 0; struct serial_rule *s_rule_array = NULL; redisReply *reply = (redisReply *)redisCommand(c, "GET MAAT_VERSION"); if (reply != NULL) { - if (reply->type == REDIS_REPLY_NIL || reply->type == REDIS_REPLY_ERROR) { + if (reply->type == REDIS_REPLY_NIL || + reply->type == REDIS_REPLY_ERROR) { log_fatal(logger, MODULE_REDIS_MONITOR, "[%s:%d] GET MAAT_VERSION failed, maybe Redis is busy", __FUNCTION__, __LINE__); @@ -531,46 +556,33 @@ int maat_get_rm_key_list(redisContext *c, long long instance_version, goto FULL_UPDATE; } - if (redis_version > instance_version && 1 == cumulative_off) { - target_version = instance_version; - } else { - target_version = redis_version - 1; + /* redis_version > instance_version */ + rule_num = get_inc_key_list(instance_version, redis_version, c, + &s_rule_array, logger); + if (rule_num < 0) { + goto FULL_UPDATE; } - do { - target_version++; - rule_num = get_inc_key_list(instance_version, target_version, - c, &s_rule_array, logger); - if (rule_num > 0) { - break; - } else if (rule_num < 0) { - goto FULL_UPDATE; - } else { - //ret=0, nothing to do. - } - - } while (0 == rule_num && target_version <= redis_version && 1 == cumulative_off); - if (0 == rule_num) { log_info(logger, MODULE_REDIS_MONITOR, "Got nothing after ZRANGEBYSCORE %s (%lld %lld", - mr_status_sset, instance_version, target_version - 1); + mr_status_sset, instance_version, redis_version); return 0; } log_info(logger, MODULE_REDIS_MONITOR, "Inc Update from instance_version %lld to %lld (%d entries)", - instance_version, target_version, rule_num); + instance_version, redis_version, rule_num); *list = s_rule_array; *update_type = MAAT_UPDATE_TYPE_INC; - *new_version = target_version; + *new_version = redis_version; return rule_num; FULL_UPDATE: log_fatal(logger, MODULE_REDIS_MONITOR, - "Initiate full update from instance_version %lld to %lld", - instance_version, redis_version); + "Initiate full update from instance_version %lld to %lld", + instance_version, redis_version); size_t append_cmd_cnt = 0; int ret = redisAppendCommand(c, "MULTI"); append_cmd_cnt++; @@ -641,7 +653,8 @@ FULL_UPDATE: } if (tbl_mgr) { - int table_id = table_manager_get_table_id(tbl_mgr, s_rule_array[full_idx].table_name); + int table_id = + table_manager_get_table_id(tbl_mgr, s_rule_array[full_idx].table_name); //Unrecognized table. if (table_id < 0) { continue; @@ -662,16 +675,17 @@ FULL_UPDATE: return rule_num ; } -static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, - int rule_num, int print_fn, struct log_handle *logger) +static void +get_foreign_conts(redisContext *c, struct serial_rule *rule_list, + int rule_num, int print_fn, struct log_handle *logger) { int i = 0; int j = 0; UNUSED int ret = 0; int key_num = 0; struct serial_rule *s_rule = NULL; - struct foreign_conts_track *track = ALLOC(struct foreign_conts_track, - rule_num * MAX_FOREIGN_CLMN_NUM); + struct foreign_conts_track *track = + ALLOC(struct foreign_conts_track, rule_num * MAX_FOREIGN_CLMN_NUM); for (i = 0; i < rule_num; i++) { s_rule = rule_list + i; @@ -689,7 +703,8 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, if (ret == -1) { log_fatal(logger, MODULE_REDIS_MONITOR, "[%s:%d] Foreign content file %s remove failed", - __FUNCTION__, __LINE__, rule_list[i].f_keys[j].filename); + __FUNCTION__, __LINE__, + rule_list[i].f_keys[j].filename); } } } else { @@ -705,7 +720,8 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, } char redis_cmd[256] = {0}; - snprintf(redis_cmd, sizeof(redis_cmd), "GET %s", s_rule->f_keys[j].key); + snprintf(redis_cmd, sizeof(redis_cmd), "GET %s", + s_rule->f_keys[j].key); ret = redisAppendCommand(c, redis_cmd); track[key_num].rule_idx = i; track[key_num].foreign_idx = j; @@ -720,8 +736,8 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, ret = maat_wrap_redis_get_reply(c, &reply); if (ret == REDIS_ERR) { log_fatal(logger, MODULE_REDIS_MONITOR, - "[%s:%d] Get %s,%lld foreign key %s content failed, redis server error", - __FUNCTION__, __LINE__, + "[%s:%d] Get %s,%lld foreign key %s content failed," + " redis server error", __FUNCTION__, __LINE__, rule_list[track[i].rule_idx].table_name, rule_list[track[i].rule_idx].rule_id, rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key); @@ -741,8 +757,9 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, FILE *fp = fopen(s_rule->f_keys[track[i].foreign_idx].filename, "w"); if (NULL == fp) { log_fatal(logger, MODULE_REDIS_MONITOR, - "[%s:%d] Write foreign content failed: fopen %s error", __FUNCTION__, - __LINE__, s_rule->f_keys[track[i].foreign_idx].filename); + "[%s:%d] Write foreign content failed: fopen %s error", + __FUNCTION__, __LINE__, + s_rule->f_keys[track[i].foreign_idx].filename); } else { fwrite(reply->str, 1, reply->len, fp); fclose(fp); @@ -762,15 +779,18 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, return; } -void maat_get_foreign_conts(redisContext *c, struct serial_rule *rule_list, - int rule_num, int print_fn, struct log_handle *logger) +void maat_get_foreign_conts(redisContext *c, + struct serial_rule *rule_list, + int rule_num, int print_fn, + struct log_handle *logger) { int max_redis_batch = 4096; int success_cnt = 0; while (success_cnt < rule_num) { int batch_cnt = MIN(rule_num - success_cnt, max_redis_batch); - _get_foreign_conts(c, rule_list + success_cnt, batch_cnt, print_fn, logger); + get_foreign_conts(c, rule_list + success_cnt, batch_cnt, + print_fn, logger); success_cnt += batch_cnt; } } @@ -806,9 +826,10 @@ void maat_rewrite_table_line_with_foreign(struct serial_rule *s_rule) for (i = 0; i < s_rule->n_foreign; i++) { int origin_column_size = 0; - const char *origin_column = maat_cmd_find_Nth_column(s_rule->table_line, - s_rule->f_keys[i].column, - &origin_column_size); + const char *origin_column = + maat_cmd_find_Nth_column(s_rule->table_line, s_rule->f_keys[i].column, + &origin_column_size); + strncat(pos_rewrite_line, pos_origin_line, origin_column - pos_origin_line); pos_rewrite_line += origin_column - pos_origin_line; pos_origin_line = origin_column+origin_column_size; @@ -839,8 +860,8 @@ static int redlock_try_lock(redisContext *c, const char *lock_name, { int ret = 0; - redisReply *reply = maat_wrap_redis_command(c, "SET %s locked NX PX %lld", - lock_name, expire); + redisReply *reply = + maat_wrap_redis_command(c, "SET %s locked NX PX %lld", lock_name, expire); if (reply->type == REDIS_REPLY_NIL) { ret = 0; } else { @@ -853,15 +874,17 @@ static int redlock_try_lock(redisContext *c, const char *lock_name, return ret; } -static long long exec_serial_rule_begin(redisContext* c, size_t rule_num, - size_t renew_rule_num, int *renew_allowed, - long long *transaction_version) +static long long +exec_serial_rule_begin(redisContext* c, size_t rule_num, + size_t renew_rule_num, int *renew_allowed, + long long *transaction_version) { int ret = -1; redisReply *data_reply = NULL; if (renew_rule_num > 0) { - while (0 == redlock_try_lock(c, mr_expire_lock, mr_expire_lock_timeout_ms)) { + while (0 == redlock_try_lock(c, mr_expire_lock, + mr_expire_lock_timeout_ms)) { usleep(1000); } *renew_allowed = 1; @@ -903,9 +926,10 @@ const char* lua_exec_done= "redis.call(\'del\', KEYS[4]);" "redis.call(\'zadd\', KEYS[3], ARGV[1], maat_version);" "return maat_version;"; -static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction_list, - long long server_time, int renew_allowed, - struct expected_reply *expect_reply, size_t *cnt) +static redisReply * +exec_serial_rule_end(redisContext *c, const char *transaction_list, + long long server_time, int renew_allowed, + struct expected_reply *expect_reply, size_t *cnt) { redisReply *data_reply = NULL; @@ -916,9 +940,10 @@ static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction } if (strlen(transaction_list) > 0) { - data_reply = maat_wrap_redis_command(c, "eval %s 4 MAAT_VERSION %s %s %s %lld", - lua_exec_done, mr_status_sset, mr_version_sset, - transaction_list, server_time); + data_reply = + maat_wrap_redis_command(c, "eval %s 4 MAAT_VERSION %s %s %s %lld", + lua_exec_done, mr_status_sset, mr_version_sset, + transaction_list, server_time); freeReplyObject(data_reply); data_reply = NULL; expected_reply_add(expect_reply + *cnt, -1, REDIS_REPLY_INTEGER, 0); @@ -930,10 +955,11 @@ static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction return data_reply; } -static void exec_serial_rule(redisContext *c, const char *transaction_list, - struct serial_rule *s_rule, size_t rule_num, - struct expected_reply *expect_reply, size_t *cnt, - size_t offset, int renew_allowed) +static void +exec_serial_rule(redisContext *c, const char *transaction_list, + struct serial_rule *s_rule, size_t rule_num, + struct expected_reply *expect_reply, size_t *cnt, + size_t offset, int renew_allowed) { size_t i = 0; size_t append_cmd_cnt = 0; @@ -947,7 +973,8 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list, s_rule[i].table_name, s_rule[i].rule_id, s_rule[i].table_line); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0); + expected_reply_add(expect_reply+*cnt, i+offset, + REDIS_REPLY_STATUS, 0); (*cnt)++; append_cmd_cnt++; //Allowing add duplicated members for rule id recycling. @@ -964,8 +991,10 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list, s_rule[i].timeout, s_rule[i].table_name, s_rule[i].rule_id); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0); + expected_reply_add(expect_reply+*cnt, i+offset, + REDIS_REPLY_INTEGER, 1); + expected_reply_add(expect_reply+*cnt, i+offset, + REDIS_REPLY_INTEGER, 0); (*cnt)++; append_cmd_cnt++; } @@ -978,7 +1007,8 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list, mr_key_prefix[MAAT_OP_DEL], s_rule[i].table_name, s_rule[i].rule_id); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0); + expected_reply_add(expect_reply+*cnt, i+offset, + REDIS_REPLY_STATUS, 0); (*cnt)++; append_cmd_cnt++; @@ -987,7 +1017,8 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list, s_rule[i].table_name, s_rule[i].rule_id, MAAT_REDIS_SYNC_TIME); - expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); + expected_reply_add(expect_reply+*cnt, i+offset, + REDIS_REPLY_INTEGER, 1); (*cnt)++; append_cmd_cnt++; @@ -1046,8 +1077,8 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list, } } -static int mr_operation_success(redisReply *actual_reply, - struct expected_reply *expected) +static int +mr_operation_success(redisReply *actual_reply, struct expected_reply *expected) { if (expected->possible_replies[0].type != actual_reply->type) { return 0; @@ -1089,7 +1120,8 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule, long long transaction_version = 0; long long transaction_finished_version = 0; size_t max_multi_cmd_num = MAX_REDIS_OP_PER_SRULE * serial_rule_num + 2; // 2 for operation in exec_serial_rule_end() - struct expected_reply *expected_reply = ALLOC(struct expected_reply, max_multi_cmd_num); + struct expected_reply *expected_reply = + ALLOC(struct expected_reply, max_multi_cmd_num); for (i = 0; i < serial_rule_num; i++) { if (s_rule[i].op == MAAT_OP_RENEW_TIMEOUT) { @@ -1097,8 +1129,8 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule, } } - int ret = exec_serial_rule_begin(c, serial_rule_num, renew_num, &renew_allowed, - &transaction_version); + int ret = exec_serial_rule_begin(c, serial_rule_num, renew_num, + &renew_allowed, &transaction_version); //Preconditions for transaction are not satisfied. if (ret != 0) { success_cnt = -1; @@ -1106,8 +1138,8 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule, } if (transaction_version > 0) { - snprintf(transaction_list, sizeof(transaction_list), "MAAT_TRANSACTION_%lld", - transaction_version); + snprintf(transaction_list, sizeof(transaction_list), + "MAAT_TRANSACTION_%lld", transaction_version); } while (success_cnt < serial_rule_num) { @@ -1118,8 +1150,9 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule, success_cnt+=batch_cnt; } - transaction_reply = exec_serial_rule_end(c, transaction_list, server_time, renew_allowed, - expected_reply, &multi_cmd_cnt); + transaction_reply = exec_serial_rule_end(c, transaction_list, server_time, + renew_allowed, expected_reply, + &multi_cmd_cnt); if (transaction_reply->type != REDIS_REPLY_NIL) { assert(transaction_reply->elements == multi_cmd_cnt); for (i = 0; i < multi_cmd_cnt; i++) { @@ -1135,9 +1168,11 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule, rule_seq = expected_reply[i].s_rule_seq; log_fatal(logger, MODULE_REDIS_MONITOR, - "[%s:%d] %s %s %lld failed, rule id maybe conflict or not exist", - __FUNCTION__, __LINE__, mr_op_str[s_rule[rule_seq].op], - s_rule[rule_seq].table_name, s_rule[rule_seq].rule_id); + "[%s:%d] %s %s %lld failed, rule id maybe conflict" + " or not exist", __FUNCTION__, __LINE__, + mr_op_str[s_rule[rule_seq].op], + s_rule[rule_seq].table_name, + s_rule[rule_seq].rule_id); success_cnt--; last_failed = rule_seq; } @@ -1146,7 +1181,8 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule, } if (transaction_version > 0) { - transaction_finished_version = maat_read_redis_integer(transaction_reply->element[multi_cmd_cnt - 1]); + transaction_finished_version = + maat_read_redis_integer(transaction_reply->element[multi_cmd_cnt - 1]); log_info(logger, MODULE_REDIS_MONITOR, "Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld", transaction_version, transaction_finished_version); @@ -1176,7 +1212,8 @@ error_out: return success_cnt; } -static void cleanup_update_status(redisContext *c, struct log_handle *logger) +static void +cleanup_update_status(redisContext *c, struct log_handle *logger) { long long version_upper_bound = 0; long long version_lower_bound = 0; @@ -1225,21 +1262,24 @@ static void cleanup_update_status(redisContext *c, struct log_handle *logger) } version_lower_bound = maat_read_redis_integer(sub_reply->element[0]); - version_upper_bound = maat_read_redis_integer(sub_reply->element[sub_reply->elements-1]); + version_upper_bound = + maat_read_redis_integer(sub_reply->element[sub_reply->elements-1]); + freeReplyObject(reply); reply = NULL; //To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally. reply = maat_wrap_redis_command(c, "ZREMRANGEBYSCORE %s %lld %lld", - mr_status_sset, version_lower_bound, - version_upper_bound); + mr_status_sset, version_lower_bound, + version_upper_bound); entry_num = maat_read_redis_integer(reply); freeReplyObject(reply); reply = NULL; log_info(logger, MODULE_REDIS_MONITOR, - "Clean up update status from version %lld to %lld (%lld versions, %lld entries)", - version_lower_bound, version_upper_bound, version_num, entry_num); + "Clean up update status from version %lld to %lld " + "(%lld versions, %lld entries)", version_lower_bound, + version_upper_bound, version_num, entry_num); return; error_out: @@ -1247,7 +1287,8 @@ error_out: reply = NULL; } -static void check_maat_expiration(redisContext *c, struct log_handle *logger) +static void +check_maat_expiration(redisContext *c, struct log_handle *logger) { UNUSED int ret = 0; @@ -1256,9 +1297,11 @@ static void check_maat_expiration(redisContext *c, struct log_handle *logger) return; } - redisReply *data_reply= maat_wrap_redis_command(c, "ZRANGEBYSCORE %s -inf %lld", - mr_expire_sset, server_time); - if (data_reply->type != REDIS_REPLY_ARRAY || 0 == data_reply->elements) { + redisReply *data_reply = + maat_wrap_redis_command(c, "ZRANGEBYSCORE %s -inf %lld", + mr_expire_sset, server_time); + if (data_reply->type != REDIS_REPLY_ARRAY || + 0 == data_reply->elements) { freeReplyObject(data_reply); data_reply = NULL; return; @@ -1276,10 +1319,12 @@ static void check_maat_expiration(redisContext *c, struct log_handle *logger) freeReplyObject(data_reply); data_reply = NULL; - int success_cnt = maat_cmd_write_rule(c, s_rule, s_rule_num, server_time, logger); + int success_cnt = maat_cmd_write_rule(c, s_rule, s_rule_num, + server_time, logger); if (success_cnt < 0) { - log_fatal(logger, MODULE_REDIS_MONITOR, "[%s:%d] maat_cmd_write_rule failed.", - __FUNCTION__, __LINE__); + log_fatal(logger, MODULE_REDIS_MONITOR, + "[%s:%d] maat_cmd_write_rule failed.", + __FUNCTION__, __LINE__); } else if (success_cnt == (int)s_rule_num) { log_info(logger, MODULE_REDIS_MONITOR, "Succesfully expired %zu rules in Redis", s_rule_num); @@ -1348,7 +1393,6 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx, int rule_num = maat_get_rm_key_list(mr_ctx->read_ctx, version, &new_version, maat_inst->tbl_mgr, &rule_list, &update_type, - maat_inst->opts.cumulative_update_off, maat_inst->logger); //redis communication error if (rule_num < 0) { @@ -1370,8 +1414,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx, redisFree(mr_ctx->read_ctx); mr_ctx->read_ctx = NULL; log_fatal(maat_inst->logger, MODULE_REDIS_MONITOR, - "[%s:%d] Get Redis value failed, abandon update and close connection", - __FUNCTION__, __LINE__); + "[%s:%d] Get Redis value failed, abandon update" + " and close connection", __FUNCTION__, __LINE__); goto clean_up; } @@ -1383,7 +1427,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx, if (empty_value_num == rule_num) { log_info(maat_inst->logger, MODULE_REDIS_MONITOR, - "All %d rules are empty, abandon update", empty_value_num); + "All %d rules are empty, abandon update", + empty_value_num); goto clean_up; } @@ -1406,7 +1451,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx, continue; } - table_id = table_manager_get_table_id(maat_inst->tbl_mgr, rule_list[i].table_name); + table_id = table_manager_get_table_id(maat_inst->tbl_mgr, + rule_list[i].table_name); //Unrecognized table. if (table_id < 0) { no_table_num++; @@ -1414,7 +1460,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx, } if (rule_list[i].op == MAAT_OP_DEL) { - valid_column = table_manager_get_valid_column(maat_inst->tbl_mgr, table_id); + valid_column = table_manager_get_valid_column(maat_inst->tbl_mgr, + table_id); ret = validate_line(rule_list[i].table_line, valid_column); if (ret < 0) { log_fatal(maat_inst->logger, MODULE_REDIS_MONITOR, @@ -1436,8 +1483,9 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx, if (call_update_num < rule_num) { log_fatal(maat_inst->logger, MODULE_REDIS_MONITOR, - "[%s:%d] Load %d entries to match engine, no table: %d, empty value: %d", - __FUNCTION__, __LINE__, call_update_num, no_table_num, empty_value_num); + "[%s:%d] Load %d entries to match engine, " + "no table: %d, empty value: %d", __FUNCTION__, __LINE__, + call_update_num, no_table_num, empty_value_num); } clean_up: diff --git a/tools/maat_redis_tool.cpp b/tools/maat_redis_tool.cpp index b63245f..363d903 100644 --- a/tools/maat_redis_tool.cpp +++ b/tools/maat_redis_tool.cpp @@ -83,7 +83,7 @@ void read_rule_from_redis(redisContext *c, const char *output_path, struct log_h FILE *index_fp = NULL; struct serial_rule *rule_list = NULL; - int rule_num = maat_get_rm_key_list(c, 0, &version, NULL, &rule_list, &update_type, 0, logger); + int rule_num = maat_get_rm_key_list(c, 0, &version, NULL, &rule_list, &update_type, logger); if (0 == rule_num) { printf("No Effective Rules.\n"); return;