[PATCH]delete useless cumulative logic

This commit is contained in:
liuwentan
2024-02-21 19:02:13 +08:00
parent 7e159477ac
commit 26d642bdcf
4 changed files with 189 additions and 142 deletions

View File

@@ -81,7 +81,7 @@ void maat_rewrite_table_line_with_foreign(struct serial_rule *s_rule);
int maat_get_rm_key_list(redisContext *c, long long instance_version, int maat_get_rm_key_list(redisContext *c, long long instance_version,
long long *new_version, struct table_manager *tbl_mgr, long long *new_version, struct table_manager *tbl_mgr,
struct serial_rule **list, int *update_type, struct serial_rule **list, int *update_type,
int cumulative_off, struct log_handle *logger); struct log_handle *logger);
void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx, void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
void (*start_fn)(long long, int, void *), void (*start_fn)(long long, int, void *),

View File

@@ -124,7 +124,6 @@ struct maat_options {
int deferred_load_on; int deferred_load_on;
int maat_json_is_gzipped; int maat_json_is_gzipped;
int cumulative_update_off; //Default: cumulative update on
int gc_timeout_ms; int gc_timeout_ms;
int rule_effect_interval_ms; int rule_effect_interval_ms;

View File

@@ -50,8 +50,9 @@ struct expected_reply {
redisReply possible_replies[REDIS_REPLY_SIZE]; redisReply possible_replies[REDIS_REPLY_SIZE];
}; };
static char *get_foreign_cont_filename(const char *table_name, long long rule_id, static char *
const char *foreign_key, const char *dir) get_foreign_cont_filename(const char *table_name, long long rule_id,
const char *foreign_key, const char *dir)
{ {
char buffer[512] = {0}; char buffer[512] = {0};
@@ -63,7 +64,8 @@ static char *get_foreign_cont_filename(const char *table_name, long long rule_id
return filename; return filename;
} }
static const char *maat_cmd_find_Nth_column(const char *line, int Nth, int *column_len) static const char *
maat_cmd_find_Nth_column(const char *line, int Nth, int *column_len)
{ {
size_t i = 0; size_t i = 0;
int j = 0; int j = 0;
@@ -99,16 +101,17 @@ static const char *maat_cmd_find_Nth_column(const char *line, int Nth, int *colu
return line + start; return line + start;
} }
static void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, static void
int n_foreign, const char *dir, struct log_handle *logger) get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns,
int n_foreign, const char *dir, struct log_handle *logger)
{ {
int foreign_key_size = 0; int foreign_key_size = 0;
p_rule->f_keys = ALLOC(struct foreign_key, n_foreign); p_rule->f_keys = ALLOC(struct foreign_key, n_foreign);
for (int i = 0; i < n_foreign; i++) { for (int i = 0; i < n_foreign; i++) {
const char *p_foreign = maat_cmd_find_Nth_column(p_rule->table_line, const char *p_foreign =
foreign_columns[i], maat_cmd_find_Nth_column(p_rule->table_line, foreign_columns[i],
&foreign_key_size); &foreign_key_size);
if (NULL == p_foreign) { if (NULL == p_foreign) {
log_fatal(logger, MODULE_REDIS_MONITOR, log_fatal(logger, MODULE_REDIS_MONITOR,
"[%s:%d] Get %s,%lld foreign keys failed: No %dth column", "[%s:%d] Get %s,%lld foreign keys failed: No %dth column",
@@ -122,10 +125,12 @@ static void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns,
continue; continue;
} }
if (0 != strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix))) { if (0 != strncmp(p_foreign, foreign_source_prefix,
strlen(foreign_source_prefix))) {
log_fatal(logger, MODULE_REDIS_MONITOR, log_fatal(logger, MODULE_REDIS_MONITOR,
"[%s:%d] Get %s,%lld foreign key failed: Invalid source prefix %s", "[%s:%d] Get %s,%lld foreign key failed: "
__FUNCTION__, __LINE__, p_rule->table_name, p_rule->rule_id, p_foreign); "Invalid source prefix %s", __FUNCTION__, __LINE__,
p_rule->table_name, p_rule->rule_id, p_foreign);
continue; continue;
} }
@@ -133,18 +138,19 @@ static void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns,
foreign_key_size = foreign_key_size - strlen(foreign_source_prefix); foreign_key_size = foreign_key_size - strlen(foreign_source_prefix);
p_foreign += strlen(foreign_source_prefix); p_foreign += strlen(foreign_source_prefix);
if (0 != strncmp(p_foreign, foreign_key_prefix, strlen(foreign_key_prefix))) { if (0 != strncmp(p_foreign, foreign_key_prefix,
strlen(foreign_key_prefix))) {
log_info(logger, MODULE_REDIS_MONITOR, log_info(logger, MODULE_REDIS_MONITOR,
"[%s:%d] %s, %lld foreign key prefix %s is not recommended", "[%s:%d] %s, %lld foreign key prefix %s is not recommended",
__FUNCTION__, __LINE__, p_rule->table_name, p_rule->rule_id, p_foreign); __FUNCTION__, __LINE__, p_rule->table_name, p_rule->rule_id,
p_foreign);
} }
p_rule->f_keys[p_rule->n_foreign].key = ALLOC(char, foreign_key_size + 1); p_rule->f_keys[p_rule->n_foreign].key = ALLOC(char, foreign_key_size + 1);
memcpy(p_rule->f_keys[p_rule->n_foreign].key, p_foreign, foreign_key_size); memcpy(p_rule->f_keys[p_rule->n_foreign].key, p_foreign, foreign_key_size);
p_rule->f_keys[p_rule->n_foreign].filename = get_foreign_cont_filename(p_rule->table_name, p_rule->f_keys[p_rule->n_foreign].filename =
p_rule->rule_id, get_foreign_cont_filename(p_rule->table_name, p_rule->rule_id,
p_rule->f_keys[p_rule->n_foreign].key, p_rule->f_keys[p_rule->n_foreign].key, dir);
dir);
p_rule->n_foreign++; p_rule->n_foreign++;
} }
@@ -153,8 +159,9 @@ static void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns,
} }
} }
static int get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_list, static int
int rule_num, struct maat *maat_inst, const char *dir) get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_list,
int rule_num, struct maat *maat_inst, const char *dir)
{ {
int rule_with_foreign_key = 0; int rule_with_foreign_key = 0;
@@ -163,29 +170,37 @@ static int get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_l
continue; continue;
} }
int table_id = table_manager_get_table_id(maat_inst->tbl_mgr, rule_list[i].table_name); int table_id = table_manager_get_table_id(maat_inst->tbl_mgr,
rule_list[i].table_name);
void *schema = table_manager_get_schema(maat_inst->tbl_mgr, table_id); void *schema = table_manager_get_schema(maat_inst->tbl_mgr, table_id);
enum table_type table_type = table_manager_get_table_type(maat_inst->tbl_mgr, table_id);
enum table_type table_type =
table_manager_get_table_type(maat_inst->tbl_mgr, table_id);
if (!schema || table_type != TABLE_TYPE_PLUGIN) { if (!schema || table_type != TABLE_TYPE_PLUGIN) {
continue; continue;
} }
int foreign_columns[8]; int foreign_columns[8];
int n_foreign_column = plugin_table_get_foreign_column((struct plugin_schema *)schema, int n_foreign_column =
foreign_columns); plugin_table_get_foreign_column((struct plugin_schema *)schema,
foreign_columns);
if (0 == n_foreign_column) { if (0 == n_foreign_column) {
continue; continue;
} }
_get_foregin_keys(rule_list+i, foreign_columns, n_foreign_column, dir, maat_inst->logger); get_foregin_keys(rule_list+i, foreign_columns, n_foreign_column,
dir, maat_inst->logger);
rule_with_foreign_key++; rule_with_foreign_key++;
} }
return rule_with_foreign_key; return rule_with_foreign_key;
} }
int maat_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *rule_list, int maat_get_foreign_keys_by_prefix(redisContext *ctx,
int rule_num, const char* dir, struct log_handle *logger) struct serial_rule *rule_list,
int rule_num, const char* dir,
struct log_handle *logger)
{ {
int j = 0; int j = 0;
int foreign_key_size = 0; int foreign_key_size = 0;
@@ -199,9 +214,13 @@ int maat_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *rule_
j = 1; j = 1;
n_foreign = 0; n_foreign = 0;
do { do {
p_foreign = maat_cmd_find_Nth_column(rule_list[i].table_line, j, &foreign_key_size); p_foreign = maat_cmd_find_Nth_column(rule_list[i].table_line, j,
if (p_foreign != NULL && foreign_key_size > (int)strlen(foreign_source_prefix) && &foreign_key_size);
0 == strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix))) {
if (p_foreign != NULL &&
foreign_key_size > (int)strlen(foreign_source_prefix) &&
0 == strncmp(p_foreign, foreign_source_prefix,
strlen(foreign_source_prefix))) {
foreign_columns[n_foreign] = j; foreign_columns[n_foreign] = j;
n_foreign++; n_foreign++;
} }
@@ -209,7 +228,8 @@ int maat_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *rule_
} while (p_foreign != NULL && n_foreign < MAX_FOREIGN_CLMN_NUM); } while (p_foreign != NULL && n_foreign < MAX_FOREIGN_CLMN_NUM);
if (n_foreign > 0) { if (n_foreign > 0) {
_get_foregin_keys(rule_list+i, foreign_columns, n_foreign, dir, logger); get_foregin_keys(rule_list+i, foreign_columns, n_foreign,
dir, logger);
rule_with_foreign_key++; rule_with_foreign_key++;
} }
} }
@@ -222,8 +242,9 @@ struct foreign_conts_track {
int foreign_idx; int foreign_idx;
}; };
static int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, static int
int rule_num, struct log_handle *logger) get_maat_redis_value(redisContext *c, struct serial_rule *rule_list,
int rule_num, struct log_handle *logger)
{ {
int i = 0; int i = 0;
int failed_cnt = 0; int failed_cnt = 0;
@@ -321,8 +342,10 @@ static int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list,
return 0; return 0;
} }
int maat_get_redis_value(redisContext *c, struct serial_rule *rule_list, int maat_get_redis_value(redisContext *c,
int rule_num, int print_process, struct log_handle *logger) struct serial_rule *rule_list,
int rule_num, int print_process,
struct log_handle *logger)
{ {
int max_redis_batch = 4096; int max_redis_batch = 4096;
int success_cnt = 0; int success_cnt = 0;
@@ -330,7 +353,7 @@ int maat_get_redis_value(redisContext *c, struct serial_rule *rule_list,
while (success_cnt < rule_num) { while (success_cnt < rule_num) {
int batch_cnt = MIN(rule_num-success_cnt, max_redis_batch); int batch_cnt = MIN(rule_num-success_cnt, max_redis_batch);
int ret = _get_maat_redis_value(c, rule_list+success_cnt, batch_cnt, logger); int ret = get_maat_redis_value(c, rule_list+success_cnt, batch_cnt, logger);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} else { } else {
@@ -352,15 +375,17 @@ int maat_get_redis_value(redisContext *c, struct serial_rule *rule_list,
return 0; return 0;
} }
static int get_inc_key_list(long long instance_version, long long target_version, static int
redisContext *c, struct serial_rule **list, get_inc_key_list(long long instance_version, long long target_version,
struct log_handle *logger) redisContext *c, struct serial_rule **list,
struct log_handle *logger)
{ {
//Returns all the elements in the sorted set at key with a score that instance_version < score <= redis_version. //Returns all the elements in the sorted set at key with a score that instance_version < score <= redis_version.
//The elements are considered to be ordered from low to high scores(instance_version). //The elements are considered to be ordered from low to high scores(instance_version).
redisReply *reply = (redisReply *)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld", redisReply *reply =
mr_status_sset, instance_version, (redisReply *)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",
target_version); mr_status_sset, instance_version,
target_version);
if (NULL == reply) { if (NULL == reply) {
log_fatal(logger, MODULE_REDIS_MONITOR, log_fatal(logger, MODULE_REDIS_MONITOR,
"[%s:%d] GET %s failed with a NULL reply, error: %s", "[%s:%d] GET %s failed with a NULL reply, error: %s",
@@ -480,15 +505,15 @@ void maat_set_serial_rule(struct serial_rule *rule, enum maat_operation op,
int maat_get_rm_key_list(redisContext *c, long long instance_version, int maat_get_rm_key_list(redisContext *c, long long instance_version,
long long *new_version, struct table_manager *tbl_mgr, long long *new_version, struct table_manager *tbl_mgr,
struct serial_rule **list, int *update_type, struct serial_rule **list, int *update_type,
int cumulative_off, struct log_handle *logger) struct log_handle *logger)
{ {
int rule_num = 0; int rule_num = 0;
long long target_version = 0;
struct serial_rule *s_rule_array = NULL; struct serial_rule *s_rule_array = NULL;
redisReply *reply = (redisReply *)redisCommand(c, "GET MAAT_VERSION"); redisReply *reply = (redisReply *)redisCommand(c, "GET MAAT_VERSION");
if (reply != NULL) { if (reply != NULL) {
if (reply->type == REDIS_REPLY_NIL || reply->type == REDIS_REPLY_ERROR) { if (reply->type == REDIS_REPLY_NIL ||
reply->type == REDIS_REPLY_ERROR) {
log_fatal(logger, MODULE_REDIS_MONITOR, log_fatal(logger, MODULE_REDIS_MONITOR,
"[%s:%d] GET MAAT_VERSION failed, maybe Redis is busy", "[%s:%d] GET MAAT_VERSION failed, maybe Redis is busy",
__FUNCTION__, __LINE__); __FUNCTION__, __LINE__);
@@ -531,46 +556,33 @@ int maat_get_rm_key_list(redisContext *c, long long instance_version,
goto FULL_UPDATE; goto FULL_UPDATE;
} }
if (redis_version > instance_version && 1 == cumulative_off) { /* redis_version > instance_version */
target_version = instance_version; rule_num = get_inc_key_list(instance_version, redis_version, c,
} else { &s_rule_array, logger);
target_version = redis_version - 1; if (rule_num < 0) {
goto FULL_UPDATE;
} }
do {
target_version++;
rule_num = get_inc_key_list(instance_version, target_version,
c, &s_rule_array, logger);
if (rule_num > 0) {
break;
} else if (rule_num < 0) {
goto FULL_UPDATE;
} else {
//ret=0, nothing to do.
}
} while (0 == rule_num && target_version <= redis_version && 1 == cumulative_off);
if (0 == rule_num) { if (0 == rule_num) {
log_info(logger, MODULE_REDIS_MONITOR, log_info(logger, MODULE_REDIS_MONITOR,
"Got nothing after ZRANGEBYSCORE %s (%lld %lld", "Got nothing after ZRANGEBYSCORE %s (%lld %lld",
mr_status_sset, instance_version, target_version - 1); mr_status_sset, instance_version, redis_version);
return 0; return 0;
} }
log_info(logger, MODULE_REDIS_MONITOR, log_info(logger, MODULE_REDIS_MONITOR,
"Inc Update from instance_version %lld to %lld (%d entries)", "Inc Update from instance_version %lld to %lld (%d entries)",
instance_version, target_version, rule_num); instance_version, redis_version, rule_num);
*list = s_rule_array; *list = s_rule_array;
*update_type = MAAT_UPDATE_TYPE_INC; *update_type = MAAT_UPDATE_TYPE_INC;
*new_version = target_version; *new_version = redis_version;
return rule_num; return rule_num;
FULL_UPDATE: FULL_UPDATE:
log_fatal(logger, MODULE_REDIS_MONITOR, log_fatal(logger, MODULE_REDIS_MONITOR,
"Initiate full update from instance_version %lld to %lld", "Initiate full update from instance_version %lld to %lld",
instance_version, redis_version); instance_version, redis_version);
size_t append_cmd_cnt = 0; size_t append_cmd_cnt = 0;
int ret = redisAppendCommand(c, "MULTI"); int ret = redisAppendCommand(c, "MULTI");
append_cmd_cnt++; append_cmd_cnt++;
@@ -641,7 +653,8 @@ FULL_UPDATE:
} }
if (tbl_mgr) { if (tbl_mgr) {
int table_id = table_manager_get_table_id(tbl_mgr, s_rule_array[full_idx].table_name); int table_id =
table_manager_get_table_id(tbl_mgr, s_rule_array[full_idx].table_name);
//Unrecognized table. //Unrecognized table.
if (table_id < 0) { if (table_id < 0) {
continue; continue;
@@ -662,16 +675,17 @@ FULL_UPDATE:
return rule_num ; return rule_num ;
} }
static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, static void
int rule_num, int print_fn, struct log_handle *logger) get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
int rule_num, int print_fn, struct log_handle *logger)
{ {
int i = 0; int i = 0;
int j = 0; int j = 0;
UNUSED int ret = 0; UNUSED int ret = 0;
int key_num = 0; int key_num = 0;
struct serial_rule *s_rule = NULL; struct serial_rule *s_rule = NULL;
struct foreign_conts_track *track = ALLOC(struct foreign_conts_track, struct foreign_conts_track *track =
rule_num * MAX_FOREIGN_CLMN_NUM); ALLOC(struct foreign_conts_track, rule_num * MAX_FOREIGN_CLMN_NUM);
for (i = 0; i < rule_num; i++) { for (i = 0; i < rule_num; i++) {
s_rule = rule_list + i; s_rule = rule_list + i;
@@ -689,7 +703,8 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
if (ret == -1) { if (ret == -1) {
log_fatal(logger, MODULE_REDIS_MONITOR, log_fatal(logger, MODULE_REDIS_MONITOR,
"[%s:%d] Foreign content file %s remove failed", "[%s:%d] Foreign content file %s remove failed",
__FUNCTION__, __LINE__, rule_list[i].f_keys[j].filename); __FUNCTION__, __LINE__,
rule_list[i].f_keys[j].filename);
} }
} }
} else { } else {
@@ -705,7 +720,8 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
} }
char redis_cmd[256] = {0}; char redis_cmd[256] = {0};
snprintf(redis_cmd, sizeof(redis_cmd), "GET %s", s_rule->f_keys[j].key); snprintf(redis_cmd, sizeof(redis_cmd), "GET %s",
s_rule->f_keys[j].key);
ret = redisAppendCommand(c, redis_cmd); ret = redisAppendCommand(c, redis_cmd);
track[key_num].rule_idx = i; track[key_num].rule_idx = i;
track[key_num].foreign_idx = j; track[key_num].foreign_idx = j;
@@ -720,8 +736,8 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
ret = maat_wrap_redis_get_reply(c, &reply); ret = maat_wrap_redis_get_reply(c, &reply);
if (ret == REDIS_ERR) { if (ret == REDIS_ERR) {
log_fatal(logger, MODULE_REDIS_MONITOR, log_fatal(logger, MODULE_REDIS_MONITOR,
"[%s:%d] Get %s,%lld foreign key %s content failed, redis server error", "[%s:%d] Get %s,%lld foreign key %s content failed,"
__FUNCTION__, __LINE__, " redis server error", __FUNCTION__, __LINE__,
rule_list[track[i].rule_idx].table_name, rule_list[track[i].rule_idx].table_name,
rule_list[track[i].rule_idx].rule_id, rule_list[track[i].rule_idx].rule_id,
rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key); rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key);
@@ -741,8 +757,9 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
FILE *fp = fopen(s_rule->f_keys[track[i].foreign_idx].filename, "w"); FILE *fp = fopen(s_rule->f_keys[track[i].foreign_idx].filename, "w");
if (NULL == fp) { if (NULL == fp) {
log_fatal(logger, MODULE_REDIS_MONITOR, log_fatal(logger, MODULE_REDIS_MONITOR,
"[%s:%d] Write foreign content failed: fopen %s error", __FUNCTION__, "[%s:%d] Write foreign content failed: fopen %s error",
__LINE__, s_rule->f_keys[track[i].foreign_idx].filename); __FUNCTION__, __LINE__,
s_rule->f_keys[track[i].foreign_idx].filename);
} else { } else {
fwrite(reply->str, 1, reply->len, fp); fwrite(reply->str, 1, reply->len, fp);
fclose(fp); fclose(fp);
@@ -762,15 +779,18 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
return; return;
} }
void maat_get_foreign_conts(redisContext *c, struct serial_rule *rule_list, void maat_get_foreign_conts(redisContext *c,
int rule_num, int print_fn, struct log_handle *logger) struct serial_rule *rule_list,
int rule_num, int print_fn,
struct log_handle *logger)
{ {
int max_redis_batch = 4096; int max_redis_batch = 4096;
int success_cnt = 0; int success_cnt = 0;
while (success_cnt < rule_num) { while (success_cnt < rule_num) {
int batch_cnt = MIN(rule_num - success_cnt, max_redis_batch); int batch_cnt = MIN(rule_num - success_cnt, max_redis_batch);
_get_foreign_conts(c, rule_list + success_cnt, batch_cnt, print_fn, logger); get_foreign_conts(c, rule_list + success_cnt, batch_cnt,
print_fn, logger);
success_cnt += batch_cnt; success_cnt += batch_cnt;
} }
} }
@@ -806,9 +826,10 @@ void maat_rewrite_table_line_with_foreign(struct serial_rule *s_rule)
for (i = 0; i < s_rule->n_foreign; i++) { for (i = 0; i < s_rule->n_foreign; i++) {
int origin_column_size = 0; int origin_column_size = 0;
const char *origin_column = maat_cmd_find_Nth_column(s_rule->table_line, const char *origin_column =
s_rule->f_keys[i].column, maat_cmd_find_Nth_column(s_rule->table_line, s_rule->f_keys[i].column,
&origin_column_size); &origin_column_size);
strncat(pos_rewrite_line, pos_origin_line, origin_column - pos_origin_line); strncat(pos_rewrite_line, pos_origin_line, origin_column - pos_origin_line);
pos_rewrite_line += origin_column - pos_origin_line; pos_rewrite_line += origin_column - pos_origin_line;
pos_origin_line = origin_column+origin_column_size; pos_origin_line = origin_column+origin_column_size;
@@ -839,8 +860,8 @@ static int redlock_try_lock(redisContext *c, const char *lock_name,
{ {
int ret = 0; int ret = 0;
redisReply *reply = maat_wrap_redis_command(c, "SET %s locked NX PX %lld", redisReply *reply =
lock_name, expire); maat_wrap_redis_command(c, "SET %s locked NX PX %lld", lock_name, expire);
if (reply->type == REDIS_REPLY_NIL) { if (reply->type == REDIS_REPLY_NIL) {
ret = 0; ret = 0;
} else { } else {
@@ -853,15 +874,17 @@ static int redlock_try_lock(redisContext *c, const char *lock_name,
return ret; return ret;
} }
static long long exec_serial_rule_begin(redisContext* c, size_t rule_num, static long long
size_t renew_rule_num, int *renew_allowed, exec_serial_rule_begin(redisContext* c, size_t rule_num,
long long *transaction_version) size_t renew_rule_num, int *renew_allowed,
long long *transaction_version)
{ {
int ret = -1; int ret = -1;
redisReply *data_reply = NULL; redisReply *data_reply = NULL;
if (renew_rule_num > 0) { if (renew_rule_num > 0) {
while (0 == redlock_try_lock(c, mr_expire_lock, mr_expire_lock_timeout_ms)) { while (0 == redlock_try_lock(c, mr_expire_lock,
mr_expire_lock_timeout_ms)) {
usleep(1000); usleep(1000);
} }
*renew_allowed = 1; *renew_allowed = 1;
@@ -903,9 +926,10 @@ const char* lua_exec_done=
"redis.call(\'del\', KEYS[4]);" "redis.call(\'del\', KEYS[4]);"
"redis.call(\'zadd\', KEYS[3], ARGV[1], maat_version);" "redis.call(\'zadd\', KEYS[3], ARGV[1], maat_version);"
"return maat_version;"; "return maat_version;";
static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction_list, static redisReply *
long long server_time, int renew_allowed, exec_serial_rule_end(redisContext *c, const char *transaction_list,
struct expected_reply *expect_reply, size_t *cnt) long long server_time, int renew_allowed,
struct expected_reply *expect_reply, size_t *cnt)
{ {
redisReply *data_reply = NULL; redisReply *data_reply = NULL;
@@ -916,9 +940,10 @@ static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction
} }
if (strlen(transaction_list) > 0) { if (strlen(transaction_list) > 0) {
data_reply = maat_wrap_redis_command(c, "eval %s 4 MAAT_VERSION %s %s %s %lld", data_reply =
lua_exec_done, mr_status_sset, mr_version_sset, maat_wrap_redis_command(c, "eval %s 4 MAAT_VERSION %s %s %s %lld",
transaction_list, server_time); lua_exec_done, mr_status_sset, mr_version_sset,
transaction_list, server_time);
freeReplyObject(data_reply); freeReplyObject(data_reply);
data_reply = NULL; data_reply = NULL;
expected_reply_add(expect_reply + *cnt, -1, REDIS_REPLY_INTEGER, 0); expected_reply_add(expect_reply + *cnt, -1, REDIS_REPLY_INTEGER, 0);
@@ -930,10 +955,11 @@ static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction
return data_reply; return data_reply;
} }
static void exec_serial_rule(redisContext *c, const char *transaction_list, static void
struct serial_rule *s_rule, size_t rule_num, exec_serial_rule(redisContext *c, const char *transaction_list,
struct expected_reply *expect_reply, size_t *cnt, struct serial_rule *s_rule, size_t rule_num,
size_t offset, int renew_allowed) struct expected_reply *expect_reply, size_t *cnt,
size_t offset, int renew_allowed)
{ {
size_t i = 0; size_t i = 0;
size_t append_cmd_cnt = 0; size_t append_cmd_cnt = 0;
@@ -947,7 +973,8 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list,
s_rule[i].table_name, s_rule[i].table_name,
s_rule[i].rule_id, s_rule[i].rule_id,
s_rule[i].table_line); s_rule[i].table_line);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0); expected_reply_add(expect_reply+*cnt, i+offset,
REDIS_REPLY_STATUS, 0);
(*cnt)++; (*cnt)++;
append_cmd_cnt++; append_cmd_cnt++;
//Allowing add duplicated members for rule id recycling. //Allowing add duplicated members for rule id recycling.
@@ -964,8 +991,10 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list,
s_rule[i].timeout, s_rule[i].timeout,
s_rule[i].table_name, s_rule[i].table_name,
s_rule[i].rule_id); s_rule[i].rule_id);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); expected_reply_add(expect_reply+*cnt, i+offset,
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 0); REDIS_REPLY_INTEGER, 1);
expected_reply_add(expect_reply+*cnt, i+offset,
REDIS_REPLY_INTEGER, 0);
(*cnt)++; (*cnt)++;
append_cmd_cnt++; append_cmd_cnt++;
} }
@@ -978,7 +1007,8 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list,
mr_key_prefix[MAAT_OP_DEL], mr_key_prefix[MAAT_OP_DEL],
s_rule[i].table_name, s_rule[i].table_name,
s_rule[i].rule_id); s_rule[i].rule_id);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_STATUS, 0); expected_reply_add(expect_reply+*cnt, i+offset,
REDIS_REPLY_STATUS, 0);
(*cnt)++; (*cnt)++;
append_cmd_cnt++; append_cmd_cnt++;
@@ -987,7 +1017,8 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list,
s_rule[i].table_name, s_rule[i].table_name,
s_rule[i].rule_id, s_rule[i].rule_id,
MAAT_REDIS_SYNC_TIME); MAAT_REDIS_SYNC_TIME);
expected_reply_add(expect_reply+*cnt, i+offset, REDIS_REPLY_INTEGER, 1); expected_reply_add(expect_reply+*cnt, i+offset,
REDIS_REPLY_INTEGER, 1);
(*cnt)++; (*cnt)++;
append_cmd_cnt++; append_cmd_cnt++;
@@ -1046,8 +1077,8 @@ static void exec_serial_rule(redisContext *c, const char *transaction_list,
} }
} }
static int mr_operation_success(redisReply *actual_reply, static int
struct expected_reply *expected) mr_operation_success(redisReply *actual_reply, struct expected_reply *expected)
{ {
if (expected->possible_replies[0].type != actual_reply->type) { if (expected->possible_replies[0].type != actual_reply->type) {
return 0; return 0;
@@ -1089,7 +1120,8 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
long long transaction_version = 0; long long transaction_version = 0;
long long transaction_finished_version = 0; long long transaction_finished_version = 0;
size_t max_multi_cmd_num = MAX_REDIS_OP_PER_SRULE * serial_rule_num + 2; // 2 for operation in exec_serial_rule_end() size_t max_multi_cmd_num = MAX_REDIS_OP_PER_SRULE * serial_rule_num + 2; // 2 for operation in exec_serial_rule_end()
struct expected_reply *expected_reply = ALLOC(struct expected_reply, max_multi_cmd_num); struct expected_reply *expected_reply =
ALLOC(struct expected_reply, max_multi_cmd_num);
for (i = 0; i < serial_rule_num; i++) { for (i = 0; i < serial_rule_num; i++) {
if (s_rule[i].op == MAAT_OP_RENEW_TIMEOUT) { if (s_rule[i].op == MAAT_OP_RENEW_TIMEOUT) {
@@ -1097,8 +1129,8 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
} }
} }
int ret = exec_serial_rule_begin(c, serial_rule_num, renew_num, &renew_allowed, int ret = exec_serial_rule_begin(c, serial_rule_num, renew_num,
&transaction_version); &renew_allowed, &transaction_version);
//Preconditions for transaction are not satisfied. //Preconditions for transaction are not satisfied.
if (ret != 0) { if (ret != 0) {
success_cnt = -1; success_cnt = -1;
@@ -1106,8 +1138,8 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
} }
if (transaction_version > 0) { if (transaction_version > 0) {
snprintf(transaction_list, sizeof(transaction_list), "MAAT_TRANSACTION_%lld", snprintf(transaction_list, sizeof(transaction_list),
transaction_version); "MAAT_TRANSACTION_%lld", transaction_version);
} }
while (success_cnt < serial_rule_num) { while (success_cnt < serial_rule_num) {
@@ -1118,8 +1150,9 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
success_cnt+=batch_cnt; success_cnt+=batch_cnt;
} }
transaction_reply = exec_serial_rule_end(c, transaction_list, server_time, renew_allowed, transaction_reply = exec_serial_rule_end(c, transaction_list, server_time,
expected_reply, &multi_cmd_cnt); renew_allowed, expected_reply,
&multi_cmd_cnt);
if (transaction_reply->type != REDIS_REPLY_NIL) { if (transaction_reply->type != REDIS_REPLY_NIL) {
assert(transaction_reply->elements == multi_cmd_cnt); assert(transaction_reply->elements == multi_cmd_cnt);
for (i = 0; i < multi_cmd_cnt; i++) { for (i = 0; i < multi_cmd_cnt; i++) {
@@ -1135,9 +1168,11 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
rule_seq = expected_reply[i].s_rule_seq; rule_seq = expected_reply[i].s_rule_seq;
log_fatal(logger, MODULE_REDIS_MONITOR, log_fatal(logger, MODULE_REDIS_MONITOR,
"[%s:%d] %s %s %lld failed, rule id maybe conflict or not exist", "[%s:%d] %s %s %lld failed, rule id maybe conflict"
__FUNCTION__, __LINE__, mr_op_str[s_rule[rule_seq].op], " or not exist", __FUNCTION__, __LINE__,
s_rule[rule_seq].table_name, s_rule[rule_seq].rule_id); mr_op_str[s_rule[rule_seq].op],
s_rule[rule_seq].table_name,
s_rule[rule_seq].rule_id);
success_cnt--; success_cnt--;
last_failed = rule_seq; last_failed = rule_seq;
} }
@@ -1146,7 +1181,8 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
} }
if (transaction_version > 0) { if (transaction_version > 0) {
transaction_finished_version = maat_read_redis_integer(transaction_reply->element[multi_cmd_cnt - 1]); transaction_finished_version =
maat_read_redis_integer(transaction_reply->element[multi_cmd_cnt - 1]);
log_info(logger, MODULE_REDIS_MONITOR, log_info(logger, MODULE_REDIS_MONITOR,
"Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld", "Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld",
transaction_version, transaction_finished_version); transaction_version, transaction_finished_version);
@@ -1176,7 +1212,8 @@ error_out:
return success_cnt; return success_cnt;
} }
static void cleanup_update_status(redisContext *c, struct log_handle *logger) static void
cleanup_update_status(redisContext *c, struct log_handle *logger)
{ {
long long version_upper_bound = 0; long long version_upper_bound = 0;
long long version_lower_bound = 0; long long version_lower_bound = 0;
@@ -1225,21 +1262,24 @@ static void cleanup_update_status(redisContext *c, struct log_handle *logger)
} }
version_lower_bound = maat_read_redis_integer(sub_reply->element[0]); version_lower_bound = maat_read_redis_integer(sub_reply->element[0]);
version_upper_bound = maat_read_redis_integer(sub_reply->element[sub_reply->elements-1]); version_upper_bound =
maat_read_redis_integer(sub_reply->element[sub_reply->elements-1]);
freeReplyObject(reply); freeReplyObject(reply);
reply = NULL; reply = NULL;
//To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally. //To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally.
reply = maat_wrap_redis_command(c, "ZREMRANGEBYSCORE %s %lld %lld", reply = maat_wrap_redis_command(c, "ZREMRANGEBYSCORE %s %lld %lld",
mr_status_sset, version_lower_bound, mr_status_sset, version_lower_bound,
version_upper_bound); version_upper_bound);
entry_num = maat_read_redis_integer(reply); entry_num = maat_read_redis_integer(reply);
freeReplyObject(reply); freeReplyObject(reply);
reply = NULL; reply = NULL;
log_info(logger, MODULE_REDIS_MONITOR, log_info(logger, MODULE_REDIS_MONITOR,
"Clean up update status from version %lld to %lld (%lld versions, %lld entries)", "Clean up update status from version %lld to %lld "
version_lower_bound, version_upper_bound, version_num, entry_num); "(%lld versions, %lld entries)", version_lower_bound,
version_upper_bound, version_num, entry_num);
return; return;
error_out: error_out:
@@ -1247,7 +1287,8 @@ error_out:
reply = NULL; reply = NULL;
} }
static void check_maat_expiration(redisContext *c, struct log_handle *logger) static void
check_maat_expiration(redisContext *c, struct log_handle *logger)
{ {
UNUSED int ret = 0; UNUSED int ret = 0;
@@ -1256,9 +1297,11 @@ static void check_maat_expiration(redisContext *c, struct log_handle *logger)
return; return;
} }
redisReply *data_reply= maat_wrap_redis_command(c, "ZRANGEBYSCORE %s -inf %lld", redisReply *data_reply =
mr_expire_sset, server_time); maat_wrap_redis_command(c, "ZRANGEBYSCORE %s -inf %lld",
if (data_reply->type != REDIS_REPLY_ARRAY || 0 == data_reply->elements) { mr_expire_sset, server_time);
if (data_reply->type != REDIS_REPLY_ARRAY ||
0 == data_reply->elements) {
freeReplyObject(data_reply); freeReplyObject(data_reply);
data_reply = NULL; data_reply = NULL;
return; return;
@@ -1276,10 +1319,12 @@ static void check_maat_expiration(redisContext *c, struct log_handle *logger)
freeReplyObject(data_reply); freeReplyObject(data_reply);
data_reply = NULL; data_reply = NULL;
int success_cnt = maat_cmd_write_rule(c, s_rule, s_rule_num, server_time, logger); int success_cnt = maat_cmd_write_rule(c, s_rule, s_rule_num,
server_time, logger);
if (success_cnt < 0) { if (success_cnt < 0) {
log_fatal(logger, MODULE_REDIS_MONITOR, "[%s:%d] maat_cmd_write_rule failed.", log_fatal(logger, MODULE_REDIS_MONITOR,
__FUNCTION__, __LINE__); "[%s:%d] maat_cmd_write_rule failed.",
__FUNCTION__, __LINE__);
} else if (success_cnt == (int)s_rule_num) { } else if (success_cnt == (int)s_rule_num) {
log_info(logger, MODULE_REDIS_MONITOR, log_info(logger, MODULE_REDIS_MONITOR,
"Succesfully expired %zu rules in Redis", s_rule_num); "Succesfully expired %zu rules in Redis", s_rule_num);
@@ -1348,7 +1393,6 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
int rule_num = maat_get_rm_key_list(mr_ctx->read_ctx, version, int rule_num = maat_get_rm_key_list(mr_ctx->read_ctx, version,
&new_version, maat_inst->tbl_mgr, &new_version, maat_inst->tbl_mgr,
&rule_list, &update_type, &rule_list, &update_type,
maat_inst->opts.cumulative_update_off,
maat_inst->logger); maat_inst->logger);
//redis communication error //redis communication error
if (rule_num < 0) { if (rule_num < 0) {
@@ -1370,8 +1414,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
redisFree(mr_ctx->read_ctx); redisFree(mr_ctx->read_ctx);
mr_ctx->read_ctx = NULL; mr_ctx->read_ctx = NULL;
log_fatal(maat_inst->logger, MODULE_REDIS_MONITOR, log_fatal(maat_inst->logger, MODULE_REDIS_MONITOR,
"[%s:%d] Get Redis value failed, abandon update and close connection", "[%s:%d] Get Redis value failed, abandon update"
__FUNCTION__, __LINE__); " and close connection", __FUNCTION__, __LINE__);
goto clean_up; goto clean_up;
} }
@@ -1383,7 +1427,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
if (empty_value_num == rule_num) { if (empty_value_num == rule_num) {
log_info(maat_inst->logger, MODULE_REDIS_MONITOR, log_info(maat_inst->logger, MODULE_REDIS_MONITOR,
"All %d rules are empty, abandon update", empty_value_num); "All %d rules are empty, abandon update",
empty_value_num);
goto clean_up; goto clean_up;
} }
@@ -1406,7 +1451,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
continue; continue;
} }
table_id = table_manager_get_table_id(maat_inst->tbl_mgr, rule_list[i].table_name); table_id = table_manager_get_table_id(maat_inst->tbl_mgr,
rule_list[i].table_name);
//Unrecognized table. //Unrecognized table.
if (table_id < 0) { if (table_id < 0) {
no_table_num++; no_table_num++;
@@ -1414,7 +1460,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
} }
if (rule_list[i].op == MAAT_OP_DEL) { if (rule_list[i].op == MAAT_OP_DEL) {
valid_column = table_manager_get_valid_column(maat_inst->tbl_mgr, table_id); valid_column = table_manager_get_valid_column(maat_inst->tbl_mgr,
table_id);
ret = validate_line(rule_list[i].table_line, valid_column); ret = validate_line(rule_list[i].table_line, valid_column);
if (ret < 0) { if (ret < 0) {
log_fatal(maat_inst->logger, MODULE_REDIS_MONITOR, log_fatal(maat_inst->logger, MODULE_REDIS_MONITOR,
@@ -1436,8 +1483,9 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
if (call_update_num < rule_num) { if (call_update_num < rule_num) {
log_fatal(maat_inst->logger, MODULE_REDIS_MONITOR, log_fatal(maat_inst->logger, MODULE_REDIS_MONITOR,
"[%s:%d] Load %d entries to match engine, no table: %d, empty value: %d", "[%s:%d] Load %d entries to match engine, "
__FUNCTION__, __LINE__, call_update_num, no_table_num, empty_value_num); "no table: %d, empty value: %d", __FUNCTION__, __LINE__,
call_update_num, no_table_num, empty_value_num);
} }
clean_up: clean_up:

View File

@@ -83,7 +83,7 @@ void read_rule_from_redis(redisContext *c, const char *output_path, struct log_h
FILE *index_fp = NULL; FILE *index_fp = NULL;
struct serial_rule *rule_list = NULL; struct serial_rule *rule_list = NULL;
int rule_num = maat_get_rm_key_list(c, 0, &version, NULL, &rule_list, &update_type, 0, logger); int rule_num = maat_get_rm_key_list(c, 0, &version, NULL, &rule_list, &update_type, logger);
if (0 == rule_num) { if (0 == rule_num) {
printf("No Effective Rules.\n"); printf("No Effective Rules.\n");
return; return;