support scan ip

This commit is contained in:
liuwentan
2022-12-09 17:12:18 +08:00
parent 6ba2f6241e
commit 0536083cbe
27 changed files with 1894 additions and 480 deletions

View File

@@ -19,6 +19,8 @@
#include "maat_redis_monitor.h"
#include "maat_table_schema.h"
#define MODULE_REDIS_MONITOR module_name_str("maat.redis_monitor")
const time_t MAAT_REDIS_RECONNECT_INTERVAL_S = 5;
const static int MAAT_REDIS_SYNC_TIME = 30 * 60;
@@ -47,7 +49,8 @@ char *get_foreign_cont_filename(const char *table_name, int rule_id, const char
return filename;
}
void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, int n_foreign, const char *dir)
void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, int n_foreign,
const char *dir, struct log_handle *logger)
{
int foreign_key_size = 0;
p_rule->f_keys = ALLOC(struct foreign_key, n_foreign);
@@ -55,8 +58,8 @@ void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, int n_f
for (int i = 0; i < n_foreign; i++) {
const char *p_foreign = maat_cmd_find_Nth_column(p_rule->table_line, foreign_columns[i], &foreign_key_size);
if (NULL == p_foreign) {
fprintf(stderr, "Get %s,%lu foreign keys failed: No %dth column\n",
p_rule->table_name, p_rule->rule_id, foreign_columns[i]);
log_error(logger, MODULE_REDIS_MONITOR, "Get %s,%lu foreign keys failed: No %dth column",
p_rule->table_name, p_rule->rule_id, foreign_columns[i]);
continue;
}
@@ -66,8 +69,8 @@ void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, int n_f
}
if (0 != strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix))) {
fprintf(stderr, "Get %s,%lu foreign key failed: Invalid source prefix %s\n",
p_rule->table_name, p_rule->rule_id, p_foreign);
log_error(logger, MODULE_REDIS_MONITOR, "Get %s,%lu foreign key failed: Invalid source prefix %s",
p_rule->table_name, p_rule->rule_id, p_foreign);
continue;
}
@@ -76,8 +79,8 @@ void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, int n_f
p_foreign += strlen(foreign_source_prefix);
if (0 != strncmp(p_foreign, foreign_key_prefix, strlen(foreign_key_prefix))) {
fprintf(stdout, "%s, %lu foreign key prefix %s is not recommended\n",
p_rule->table_name, p_rule->rule_id, p_foreign);
log_info(logger, MODULE_REDIS_MONITOR, "%s, %lu foreign key prefix %s is not recommended",
p_rule->table_name, p_rule->rule_id, p_foreign);
}
p_rule->f_keys[p_rule->n_foreign].key = ALLOC(char, foreign_key_size+1);
@@ -114,7 +117,7 @@ int get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_list, in
continue;
}
_get_foregin_keys(rule_list+i, foreign_columns, n_foreign_column, dir);
_get_foregin_keys(rule_list+i, foreign_columns, n_foreign_column, dir, maat_instance->logger);
rule_with_foreign_key++;
}
@@ -122,7 +125,7 @@ int get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_list, in
}
int maat_cmd_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *rule_list,
int rule_num, const char* dir)
int rule_num, const char* dir, struct log_handle *logger)
{
int j = 0;
int foreign_key_size = 0;
@@ -146,7 +149,7 @@ int maat_cmd_get_foreign_keys_by_prefix(redisContext *ctx, struct serial_rule *r
} while (p_foreign != NULL && n_foreign < MAX_FOREIGN_CLMN_NUM);
if (n_foreign > 0) {
_get_foregin_keys(rule_list+i, foreign_columns, n_foreign, dir);
_get_foregin_keys(rule_list+i, foreign_columns, n_foreign, dir, logger);
rule_with_foreign_key++;
}
}
@@ -160,7 +163,8 @@ struct foreign_conts_track
int foreign_idx;
};
int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int rule_num)
int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int rule_num,
struct log_handle *logger)
{
int i = 0;
int failed_cnt = 0;
@@ -181,8 +185,9 @@ int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int ru
for (i = 0; i < rule_num; i++) {
ret = maat_cmd_wrap_redis_get_reply(c, &reply);
if (ret == REDIS_ERR) {
fprintf(stderr, "Redis GET %s:%s,%lu failed, redis server error\n", mr_key_prefix[rule_list[i].op],
rule_list[i].table_name, rule_list[i].rule_id);
log_error(logger, MODULE_REDIS_MONITOR, "Redis GET %s:%s,%lu failed, redis server error",
mr_key_prefix[rule_list[i].op],
rule_list[i].table_name, rule_list[i].rule_id);
error_happened = 1;
break;
}
@@ -194,8 +199,9 @@ int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int ru
retry_ids[failed_cnt] = i;
failed_cnt++;
} else {
fprintf(stderr, "Redis GET %s:%s,%lu failed\n",mr_key_prefix[rule_list[i].op],
rule_list[i].table_name, rule_list[i].rule_id);
log_error(logger, MODULE_REDIS_MONITOR, "Redis GET %s:%s,%lu failed",
mr_key_prefix[rule_list[i].op],
rule_list[i].table_name, rule_list[i].rule_id);
error_happened = 1;
}
}
@@ -222,7 +228,7 @@ int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int ru
idx = retry_ids[i];
ret = maat_cmd_wrap_redis_get_reply(c, &reply);
if (ret == REDIS_ERR) {
fprintf(stderr, "redis command %s failed, redis server error\n", redis_cmd);
log_error(logger, MODULE_REDIS_MONITOR, "redis command %s failed, redis server error", redis_cmd);
FREE(retry_ids);
return -1;
}
@@ -231,10 +237,12 @@ int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int ru
rule_list[idx].table_line = maat_strdup(reply->str);
} else if(reply->type==REDIS_REPLY_ERROR) {
//Deal with Redis response: "Loading Redis is loading the database in memory"
fprintf(stderr, "redis command %s error, reply type=%d, error str=%s\n", redis_cmd, reply->type, reply->str);
log_error(logger, MODULE_REDIS_MONITOR, "redis command %s error, reply type=%d, error str=%s",
redis_cmd, reply->type, reply->str);
} else {
//Handle type "nil"
fprintf(stderr, "redis command %s failed, reply type=%d\n", redis_cmd, reply->type);
log_error(logger, MODULE_REDIS_MONITOR, "redis command %s failed, reply type=%d",
redis_cmd, reply->type);
}
freeReplyObject(reply);
@@ -246,7 +254,8 @@ int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int ru
return 0;
}
int maat_cmd_get_redis_value(redisContext *c, struct serial_rule *rule_list, int rule_num, int print_process)
int maat_cmd_get_redis_value(redisContext *c, struct serial_rule *rule_list, int rule_num,
int print_process, struct log_handle *logger)
{
int max_redis_batch = 4096;
int success_cnt = 0;
@@ -254,7 +263,7 @@ int maat_cmd_get_redis_value(redisContext *c, struct serial_rule *rule_list, int
while (success_cnt < rule_num) {
int batch_cnt = MIN(rule_num-success_cnt, max_redis_batch);
int ret = _get_maat_redis_value(c, rule_list+success_cnt, batch_cnt);
int ret = _get_maat_redis_value(c, rule_list+success_cnt, batch_cnt, logger);
if (ret < 0) {
return -1;
} else {
@@ -277,14 +286,15 @@ int maat_cmd_get_redis_value(redisContext *c, struct serial_rule *rule_list, int
}
int get_inc_key_list(long long instance_version, long long target_version,
redisContext *c, struct serial_rule **list)
redisContext *c, struct serial_rule **list, struct log_handle *logger)
{
//Returns all the elements in the sorted set at key with a score that instance_version < score <= redis_version.
//The elements are considered to be ordered from low to high scores(instance_version).
redisReply *reply = (redisReply *)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld", mr_status_sset,
instance_version,target_version);
if (NULL == reply) {
fprintf(stderr, "GET %s failed with a NULL reply, error: %s\n", mr_status_sset, c->errstr);
log_error(logger, MODULE_REDIS_MONITOR, "GET %s failed with a NULL reply, error: %s",
mr_status_sset, c->errstr);
return -1;
}
@@ -299,8 +309,8 @@ int get_inc_key_list(long long instance_version, long long target_version,
redisReply *tmp_reply= maat_cmd_wrap_redis_command(c, "ZSCORE %s %s", mr_status_sset, reply->element[0]->str);
if (tmp_reply->type != REDIS_REPLY_STRING) {
fprintf(stderr, "ZSCORE %s %s failed Version: %lld->%lld\n", mr_status_sset,
reply->element[0]->str, instance_version, target_version);
log_error(logger, MODULE_REDIS_MONITOR, "ZSCORE %s %s failed Version: %lld->%lld",
mr_status_sset, reply->element[0]->str, instance_version, target_version);
freeReplyObject(tmp_reply);
tmp_reply = NULL;
freeReplyObject(reply);
@@ -317,8 +327,8 @@ int get_inc_key_list(long long instance_version, long long target_version,
}
if (nearest_rule_version != instance_version + 1) {
fprintf(stdout, "Noncontinuous VERSION Redis: %lld MAAT: %lld\n",
nearest_rule_version, instance_version);
log_info(logger, MODULE_REDIS_MONITOR, "Noncontinuous VERSION Redis: %lld MAAT: %lld",
nearest_rule_version, instance_version);
}
int i = 0;
@@ -331,7 +341,7 @@ int get_inc_key_list(long long instance_version, long long target_version,
int ret = sscanf(reply->element[i]->str, "%[^,],%[^,],%lu", op_str,
s_rule[j].table_name, &(s_rule[j].rule_id));
if (ret != 3 || s_rule[i].rule_id < 0) {
fprintf(stderr, "Invalid Redis Key: %s\n", reply->element[i]->str);
log_error(logger, MODULE_REDIS_MONITOR, "Invalid Redis Key: %s", reply->element[i]->str);
continue;
}
@@ -340,7 +350,7 @@ int get_inc_key_list(long long instance_version, long long target_version,
} else if(strncmp(op_str, "DEL", strlen("DEL")) == 0) {
s_rule[j].op = MAAT_OP_DEL;
} else {
fprintf(stderr, "Invalid Redis Key: %s\n", reply->element[i]->str);
log_error(logger, MODULE_REDIS_MONITOR, "Invalid Redis Key: %s", reply->element[i]->str);
continue;
}
j++;
@@ -458,7 +468,8 @@ int recovery_history_version(const struct serial_rule *current, int current_num,
int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version, long long desired_version,
long long *new_version, struct table_schema_manager* table_schema_mgr,
struct serial_rule **list, int *update_type, int cumulative_off)
struct serial_rule **list, int *update_type, int cumulative_off,
struct log_handle *logger)
{
int rule_num = 0;
long long target_version = 0;
@@ -467,20 +478,20 @@ int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version, long l
redisReply *reply = (redisReply *)redisCommand(c, "GET MAAT_VERSION");
if (reply != NULL) {
if (reply->type == REDIS_REPLY_NIL || reply->type == REDIS_REPLY_ERROR) {
fprintf(stderr, "GET MAAT_VERSION failed, maybe Redis is busy\n");
log_error(logger, MODULE_REDIS_MONITOR, "GET MAAT_VERSION failed, maybe Redis is busy");
freeReplyObject(reply);
reply = NULL;
return -1;
}
} else {
fprintf(stderr, "GET MAAT_VERSION failed with NULL reply, error: %s\n", c->errstr);
log_error(logger, MODULE_REDIS_MONITOR, "GET MAAT_VERSION failed with NULL reply, error: %s", c->errstr);
return -1;
}
long long redis_version = maat_cmd_read_redis_integer(reply);
if (redis_version < 0) {
if (reply->type == REDIS_REPLY_ERROR) {
fprintf(stderr, "Redis Communication error: %s\n", reply->str);
log_error(logger, MODULE_REDIS_MONITOR, "Redis Communication error: %s", reply->str);
}
return -1;
}
@@ -497,7 +508,8 @@ int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version, long l
}
if (redis_version < instance_version) {
fprintf(stderr, "VERSION roll back MAAT: %lld -> Redis: %lld\n", instance_version, redis_version);
log_error(logger, MODULE_REDIS_MONITOR, "VERSION roll back MAAT: %lld -> Redis: %lld",
instance_version, redis_version);
goto FULL_UPDATE;
}
@@ -509,7 +521,7 @@ int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version, long l
do {
target_version++;
rule_num = get_inc_key_list(instance_version, target_version, c, &s_rule_array);
rule_num = get_inc_key_list(instance_version, target_version, c, &s_rule_array, logger);
if (rule_num > 0) {
break;
} else if (rule_num < 0) {
@@ -521,13 +533,13 @@ int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version, long l
} while (0 == rule_num && target_version <= redis_version && 1 == cumulative_off);
if (0 == rule_num) {
fprintf(stdout, "Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s\n", mr_status_sset,
instance_version, target_version-1, cumulative_off == 1 ? "OFF" : "ON");
log_info(logger, MODULE_REDIS_MONITOR, "Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s",
mr_status_sset, instance_version, target_version-1, cumulative_off == 1 ? "OFF" : "ON");
return 0;
}
fprintf(stdout, "Inc Update from instance_version %lld to %lld (%d entries)\n",
instance_version, target_version, rule_num);
log_info(logger, MODULE_REDIS_MONITOR, "Inc Update from instance_version %lld to %lld (%d entries)",
instance_version, target_version, rule_num);
*list = s_rule_array;
*update_type = CM_UPDATE_TYPE_INC;
@@ -535,8 +547,8 @@ int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version, long l
return rule_num;
FULL_UPDATE:
fprintf(stdout, "Initiate full update from instance_version %lld to %lld\n", instance_version,
desired_version == 0 ? redis_version : desired_version);
log_info(logger, MODULE_REDIS_MONITOR, "Initiate full update from instance_version %lld to %lld",
instance_version, desired_version == 0 ? redis_version : desired_version);
size_t append_cmd_cnt = 0;
int ret = redisAppendCommand(c, "MULTI");
append_cmd_cnt++;
@@ -557,12 +569,12 @@ FULL_UPDATE:
reply = maat_cmd_wrap_redis_command(c, "EXEC");
if (NULL == reply) {
fprintf(stderr, "Redis Communication error: %s\n", c->errstr);
log_error(logger, MODULE_REDIS_MONITOR, "Redis Communication error: %s", c->errstr);
return -1;
}
if (reply->type != REDIS_REPLY_ARRAY) {
fprintf(stderr, "Invalid Redis Key List type %d\n", reply->type);
log_error(logger, MODULE_REDIS_MONITOR, "Invalid Redis Key List type %d", reply->type);
freeReplyObject(reply);
reply = NULL;
return -1;
@@ -571,7 +583,7 @@ FULL_UPDATE:
*new_version = maat_cmd_read_redis_integer(reply->element[0]);
redisReply *sub_reply = reply->element[1];
if (sub_reply->type != REDIS_REPLY_ARRAY) {
fprintf(stderr, "Invalid Redis Key List type %d\n", sub_reply->type);
log_error(logger, MODULE_REDIS_MONITOR, "Invalid Redis Key List type %d", sub_reply->type);
freeReplyObject(reply);
reply = NULL;
return -1;
@@ -581,7 +593,7 @@ FULL_UPDATE:
s_rule_array = ALLOC(struct serial_rule, sub_reply->elements);
for (i = 0, full_idx = 0; i < sub_reply->elements; i++) {
if (sub_reply->element[i]->type != REDIS_REPLY_STRING) {
fprintf(stderr, "Invalid Redis Key Type: %d\n", sub_reply->element[i]->type);
log_error(logger, MODULE_REDIS_MONITOR, "Invalid Redis Key Type: %d", sub_reply->element[i]->type);
continue;
}
@@ -591,7 +603,7 @@ FULL_UPDATE:
s_rule_array[full_idx].op = MAAT_OP_ADD;
if (ret != 2 || s_rule_array[full_idx].rule_id < 0 || strlen(s_rule_array[full_idx].table_name) == 0) {
fprintf(stderr, "Invalid Redis Key Format: %s\n", sub_reply->element[i]->str);
log_error(logger, MODULE_REDIS_MONITOR, "Invalid Redis Key Format: %s", sub_reply->element[i]->str);
continue;
}
@@ -611,13 +623,13 @@ FULL_UPDATE:
if (desired_version != 0) {
struct serial_rule *changed_rule_array = NULL;
int changed_rule_num = get_inc_key_list(desired_version, redis_version, c, &changed_rule_array);
int changed_rule_num = get_inc_key_list(desired_version, redis_version, c, &changed_rule_array, logger);
if (changed_rule_num < 0) {
fprintf(stderr, "Recover history version %lld faild where as redis version is %lld\n",
desired_version, redis_version);
log_error(logger, MODULE_REDIS_MONITOR, "Recover history version %lld faild where as redis version is %lld",
desired_version, redis_version);
} else if(0 == changed_rule_num) {
fprintf(stderr, "Nothing to recover from history version %lld to redis version is %lld\n",
desired_version, redis_version);
log_error(logger, MODULE_REDIS_MONITOR, "Nothing to recover from history version %lld to redis version is %lld",
desired_version, redis_version);
} else {
struct serial_rule *history_rule_array = NULL;
ret = recovery_history_version(s_rule_array, full_idx, changed_rule_array, changed_rule_num, &history_rule_array);
@@ -626,8 +638,8 @@ FULL_UPDATE:
s_rule_array = history_rule_array;
rule_num = ret;
*new_version = desired_version;
fprintf(stdout, "Successfully recovered from history version %lld to redis version is %lld\n",
desired_version, redis_version);
log_info(logger, MODULE_REDIS_MONITOR, "Successfully recovered from history version %lld to redis version is %lld",
desired_version, redis_version);
}
}
FREE(changed_rule_array);
@@ -635,12 +647,13 @@ FULL_UPDATE:
*list = s_rule_array;
*update_type = CM_UPDATE_TYPE_FULL;
fprintf(stdout, "Full update %d keys of version %lld\n", rule_num, *new_version);
log_info(logger, MODULE_REDIS_MONITOR, "Full update %d keys of version %lld", rule_num, *new_version);
return rule_num ;
}
void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, int rule_num, int print_fn)
void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, int rule_num, int print_fn,
struct log_handle *logger)
{
int i = 0;
int j = 0;
@@ -663,8 +676,8 @@ void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, int rule
ret = remove(rule_list[i].f_keys[j].filename);
if (ret == -1) {
fprintf(stderr, "Foreign content file %s remove failed\n",
rule_list[i].f_keys[j].filename);
log_error(logger, MODULE_REDIS_MONITOR, "Foreign content file %s remove failed",
rule_list[i].f_keys[j].filename);
}
}
} else {
@@ -694,25 +707,28 @@ void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, int rule
for (i = 0; i < key_num; i++) {
ret = maat_cmd_wrap_redis_get_reply(c, &reply);
if (ret == REDIS_ERR) {
fprintf(stderr, "Get %s,%lu foreign key %s content failed, redis server error\n",
rule_list[track[i].rule_idx].table_name,
rule_list[track[i].rule_idx].rule_id,
rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key);
log_error(logger, MODULE_REDIS_MONITOR,
"Get %s,%lu foreign key %s content failed, redis server error",
rule_list[track[i].rule_idx].table_name,
rule_list[track[i].rule_idx].rule_id,
rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key);
break;
}
if (reply->type != REDIS_REPLY_STRING) {
fprintf(stderr, "Get %s,%lu foreign key %s content failed\n",
rule_list[track[i].rule_idx].table_name,
rule_list[track[i].rule_idx].rule_id,
rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key);
log_error(logger, MODULE_REDIS_MONITOR,
"Get %s,%lu foreign key %s content failed",
rule_list[track[i].rule_idx].table_name,
rule_list[track[i].rule_idx].rule_id,
rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key);
continue;
} else {
s_rule = rule_list+track[i].rule_idx;
FILE *fp = fopen(s_rule->f_keys[track[i].foreign_idx].filename, "w");
if (NULL == fp) {
fprintf(stderr, "Write foreign content failed: fopen %s error\n",
s_rule->f_keys[track[i].foreign_idx].filename);
log_error(logger, MODULE_REDIS_MONITOR,
"Write foreign content failed: fopen %s error",
s_rule->f_keys[track[i].foreign_idx].filename);
} else {
fwrite(reply->str, 1, reply->len, fp);
fclose(fp);
@@ -731,14 +747,15 @@ void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, int rule
return;
}
void maat_cmd_get_foreign_conts(redisContext *c, struct serial_rule *rule_list, int rule_num, int print_fn)
void maat_cmd_get_foreign_conts(redisContext *c, struct serial_rule *rule_list, int rule_num, int print_fn,
struct log_handle *logger)
{
int max_redis_batch = 4096;
int success_cnt = 0;
while (success_cnt < rule_num) {
int batch_cnt = MIN(rule_num - success_cnt, max_redis_batch);
_get_foreign_conts(c, rule_list + success_cnt, batch_cnt, print_fn);
_get_foreign_conts(c, rule_list + success_cnt, batch_cnt, print_fn, logger);
success_cnt += batch_cnt;
}
}
@@ -1036,7 +1053,8 @@ int mr_operation_success(redisReply *actual_reply, struct expected_reply *expect
return 0;
}
int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule, size_t serial_rule_num, long long server_time)
int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule, size_t serial_rule_num,
long long server_time, struct log_handle *logger)
{
size_t i = 0;
size_t rule_seq = 0;
@@ -1085,16 +1103,20 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule, size_t seri
assert(transaction_reply->elements == multi_cmd_cnt);
for (i = 0; i < multi_cmd_cnt; i++) {
p = transaction_reply->element[i];
//failed is acceptable
//or transaciton is success
//failed is acceptable
//or transaciton is success
//or continuation of last failed
if (expected_reply[i].s_rule_seq == -1 || 1 == mr_operation_success(p, expected_reply+i) || last_failed == expected_reply[i].s_rule_seq) {
if (expected_reply[i].s_rule_seq == -1 ||
1 == mr_operation_success(p, expected_reply+i) ||
last_failed == expected_reply[i].s_rule_seq) {
continue;
}
rule_seq = expected_reply[i].s_rule_seq;
fprintf(stderr, "%s %s %lu failed, rule id maybe conflict or not exist\n",
mr_op_str[s_rule[rule_seq].op], s_rule[rule_seq].table_name,
s_rule[rule_seq].rule_id);
log_error(logger, MODULE_REDIS_MONITOR,
"%s %s %lu failed, rule id maybe conflict or not exist",
mr_op_str[s_rule[rule_seq].op], s_rule[rule_seq].table_name,
s_rule[rule_seq].rule_id);
success_cnt--;
last_failed = rule_seq;
}
@@ -1103,9 +1125,10 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule, size_t seri
}
if (transaction_version > 0) {
transaction_finished_version = maat_cmd_read_redis_integer(transaction_reply->element[multi_cmd_cnt-1]);
fprintf(stdout, "Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld\n",
transaction_version, transaction_finished_version);
transaction_finished_version = maat_cmd_read_redis_integer(transaction_reply->element[multi_cmd_cnt - 1]);
log_info(logger, MODULE_REDIS_MONITOR,
"Redis transaction MAAT_PRE_VER = %lld , MAAT_VERSION = %lld",
transaction_version, transaction_finished_version);
}
freeReplyObject(transaction_reply);
@@ -1115,9 +1138,8 @@ error_out:
if (renew_num > 0 && renew_allowed != 1) {
for (i = 0; i < (unsigned int)serial_rule_num; i++) {
if (s_rule[i].op == MAAT_OP_RENEW_TIMEOUT) {
fprintf(stdout, "%s %s %lu is not allowed due to lock contention\n",
mr_op_str[MAAT_OP_RENEW_TIMEOUT], s_rule[i].table_name,
s_rule[i].rule_id);
log_error(logger, MODULE_REDIS_MONITOR, "%s %s %lu is not allowed due to lock contention",
mr_op_str[MAAT_OP_RENEW_TIMEOUT], s_rule[i].table_name, s_rule[i].rule_id);
}
}
@@ -1131,7 +1153,7 @@ error_out:
return success_cnt;
}
void cleanup_update_status(redisContext *c)
void cleanup_update_status(redisContext *c, struct log_handle *logger)
{
long long version_upper_bound = 0;
long long version_lower_bound = 0;
@@ -1191,18 +1213,17 @@ void cleanup_update_status(redisContext *c)
freeReplyObject(reply);
reply = NULL;
fprintf(stdout, "Clean up update status from version %lld to %lld (%lld versions, %lld entries)\n",
version_lower_bound, version_upper_bound, version_num, entry_num);
log_info(logger, MODULE_REDIS_MONITOR,
"Clean up update status from version %lld to %lld (%lld versions, %lld entries)",
version_lower_bound, version_upper_bound, version_num, entry_num);
return;
error_out:
freeReplyObject(reply);
reply = NULL;
return;
}
void check_maat_expiration(redisContext *c)
void check_maat_expiration(redisContext *c, struct log_handle *logger)
{
UNUSED int ret = 0;
@@ -1229,14 +1250,14 @@ void check_maat_expiration(redisContext *c)
freeReplyObject(data_reply);
data_reply = NULL;
int success_cnt = maat_cmd_write_rule(c, s_rule, s_rule_num, server_time);
int success_cnt = maat_cmd_write_rule(c, s_rule, s_rule_num, server_time, logger);
if (success_cnt < 0) {
fprintf(stderr, "maat_cmd_write_rule failed.\n");
log_error(logger, MODULE_REDIS_MONITOR, "maat_cmd_write_rule failed.");
} else if (success_cnt == (int)s_rule_num) {
fprintf(stdout, "Succesfully expired %zu rules in Redis\n", s_rule_num);
log_info(logger, MODULE_REDIS_MONITOR, "Succesfully expired %zu rules in Redis", s_rule_num);
} else {
fprintf(stderr, "Failed to expired %d of %zu rules in Redis, try later\n",
s_rule_num - success_cnt, s_rule_num);
log_error(logger, MODULE_REDIS_MONITOR, "Failed to expired %d of %zu rules in Redis, try later",
s_rule_num - success_cnt, s_rule_num);
}
FREE(s_rule);
@@ -1245,8 +1266,7 @@ void check_maat_expiration(redisContext *c)
void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
void (*start_fn)(long long, int, void *),
int (*update_fn)(const char *, const char *, void *),
void (*finish_fn)(void *),
void *u_param)
void (*finish_fn)(void *), void *u_param)
{
int i = 0;
int ret = 0;
@@ -1258,13 +1278,14 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
enum table_type table_type;
enum scan_type scan_type;
struct table_schema *table_schema = NULL;
struct maat *maat_instance = (struct maat *)u_param;
//authorized to write
if (mr_ctx->write_ctx != NULL && mr_ctx->write_ctx->err == 0) {
//For thread safe, deliberately use redis_read_ctx but not redis_write_ctx.
if (1 == redlock_try_lock(mr_ctx->read_ctx, mr_expire_lock, mr_expire_lock_timeout_ms)) {
check_maat_expiration(mr_ctx->read_ctx);
cleanup_update_status(mr_ctx->read_ctx);
check_maat_expiration(mr_ctx->read_ctx, maat_instance->logger);
cleanup_update_status(mr_ctx->read_ctx, maat_instance->logger);
redlock_unlock(mr_ctx->read_ctx, mr_expire_lock);
}
}
@@ -1278,9 +1299,10 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
if (mr_ctx->read_ctx != NULL) {
redisFree(mr_ctx->read_ctx);
}
fprintf(stdout, "Reconnecting...\n");
log_info(maat_instance->logger, MODULE_REDIS_MONITOR, "Reconnecting...");
mr_ctx->read_ctx = maat_cmd_connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db);
mr_ctx->read_ctx = maat_cmd_connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db,
maat_instance->logger);
if (NULL == mr_ctx->read_ctx) {
return;
} else {
@@ -1288,14 +1310,14 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
}
}
struct maat *maat_instance = (struct maat *)u_param;
struct serial_rule *rule_list = NULL;
long long new_version = 0;
int update_type = CM_UPDATE_TYPE_INC;
int rule_num = maat_cmd_get_rm_key_list(mr_ctx->read_ctx, version, maat_instance->load_specific_version,
&new_version, maat_instance->table_schema_mgr, &rule_list,
&update_type, maat_instance->cumulative_update_off);
&update_type, maat_instance->cumulative_update_off,
maat_instance->logger);
//redis communication error
if (rule_num < 0) {
redisFree(mr_ctx->read_ctx);
@@ -1310,12 +1332,13 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
}
if (rule_num > 0) {
ret = maat_cmd_get_redis_value(mr_ctx->read_ctx, rule_list, rule_num, 0);
ret = maat_cmd_get_redis_value(mr_ctx->read_ctx, rule_list, rule_num, 0, maat_instance->logger);
//redis communication error
if (ret < 0) {
redisFree(mr_ctx->read_ctx);
mr_ctx->read_ctx = NULL;
fprintf(stderr, "Get Redis value failed, abandon update and close connection\n");
log_error(maat_instance->logger, MODULE_REDIS_MONITOR,
"Get Redis value failed, abandon update and close connection");
goto clean_up;
}
@@ -1326,19 +1349,20 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
}
if (empty_value_num == rule_num) {
fprintf(stdout, "All %d rules are empty, abandon update\n", empty_value_num);
log_info(maat_instance->logger, MODULE_REDIS_MONITOR,
"All %d rules are empty, abandon update", empty_value_num);
goto clean_up;
}
ret = get_foreign_keys_define(mr_ctx->read_ctx, rule_list, rule_num, maat_instance, maat_instance->foreign_cont_dir);
if (ret > 0) {
maat_cmd_get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0);
maat_cmd_get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0, maat_instance->logger);
}
}
start_fn(new_version, update_type, u_param);
fprintf(stdout, "Start %s update: %lld -> %lld (%d entries)\n",
update_type==CM_UPDATE_TYPE_INC?"INC":"FULL", version, new_version, rule_num);
log_info(maat_instance->logger, MODULE_REDIS_MONITOR, "Start %s update: %lld -> %lld (%d entries)",
update_type==CM_UPDATE_TYPE_INC?"INC":"FULL", version, new_version, rule_num);
for (i = 0; i < rule_num; i++) {
if (NULL == rule_list[i].table_line) {
@@ -1360,7 +1384,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
valid_column = table_schema_get_valid_flag_column(table_schema);
ret = invalidate_line(rule_list[i].table_line, table_type, valid_column);
if (ret < 0) {
fprintf(stdout, "Invalidate line failed, invaid format %s\n", rule_list[i].table_line);
log_error(maat_instance->logger, MODULE_REDIS_MONITOR,
"Invalidate line failed, invaid format %s", rule_list[i].table_line);
continue;
}
}
@@ -1376,8 +1401,9 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
finish_fn(u_param);
if (call_update_num < rule_num) {
fprintf(stdout, "Load %d entries to match engine, no table: %d, empty value: %d\n",
call_update_num, no_table_num, empty_value_num);
log_error(maat_instance->logger, MODULE_REDIS_MONITOR,
"Load %d entries to match engine, no table: %d, empty value: %d",
call_update_num, no_table_num, empty_value_num);
}
clean_up: