unfinished work

This commit is contained in:
liuwentan
2023-02-03 17:28:14 +08:00
parent cca7d882e1
commit 57f0a0581a
45 changed files with 2338 additions and 1522 deletions

View File

@@ -11,8 +11,9 @@
#include <string.h>
#include <assert.h>
#include <sys/stat.h>
#include <stdio.h>
#include "utils.h"
#include "maat/maat.h"
#include "maat_utils.h"
#include "maat_command.h"
#include "maat_config_monitor.h"
@@ -39,28 +40,34 @@ const char *foreign_key_prefix = "__FILE_";
const char *mr_op_str[] = {"DEL", "ADD", "RENEW_TIMEOUT"};
char *get_foreign_cont_filename(const char *table_name, int rule_id, const char *foreign_key, const char *dir)
char *get_foreign_cont_filename(const char *table_name, int rule_id,
const char *foreign_key, const char *dir)
{
char buffer[512] = {0};
snprintf(buffer, sizeof(buffer),"%s/%s-%d-%s", dir, table_name, rule_id, foreign_key);
snprintf(buffer, sizeof(buffer),"%s/%s-%d-%s", dir,
table_name, rule_id, foreign_key);
char *filename = ALLOC(char, strlen(buffer) + 1);
memcpy(filename, buffer, strlen(buffer));
return filename;
}
void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, int n_foreign,
const char *dir, struct log_handle *logger)
void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns,
int n_foreign, const char *dir, struct log_handle *logger)
{
int foreign_key_size = 0;
p_rule->f_keys = ALLOC(struct foreign_key, n_foreign);
for (int i = 0; i < n_foreign; i++) {
const char *p_foreign = maat_cmd_find_Nth_column(p_rule->table_line, foreign_columns[i], &foreign_key_size);
const char *p_foreign = maat_cmd_find_Nth_column(p_rule->table_line,
foreign_columns[i],
&foreign_key_size);
if (NULL == p_foreign) {
log_error(logger, MODULE_REDIS_MONITOR, "Get %s,%lu foreign keys failed: No %dth column",
p_rule->table_name, p_rule->rule_id, foreign_columns[i]);
log_error(logger, MODULE_REDIS_MONITOR,
"Get %s,%lu foreign keys failed: No %dth column",
p_rule->table_name, p_rule->rule_id,
foreign_columns[i]);
continue;
}
@@ -70,7 +77,8 @@ void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, int n_f
}
if (0 != strncmp(p_foreign, foreign_source_prefix, strlen(foreign_source_prefix))) {
log_error(logger, MODULE_REDIS_MONITOR, "Get %s,%lu foreign key failed: Invalid source prefix %s",
log_error(logger, MODULE_REDIS_MONITOR,
"Get %s,%lu foreign key failed: Invalid source prefix %s",
p_rule->table_name, p_rule->rule_id, p_foreign);
continue;
}
@@ -80,13 +88,17 @@ void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, int n_f
p_foreign += strlen(foreign_source_prefix);
if (0 != strncmp(p_foreign, foreign_key_prefix, strlen(foreign_key_prefix))) {
log_info(logger, MODULE_REDIS_MONITOR, "%s, %lu foreign key prefix %s is not recommended",
log_info(logger, MODULE_REDIS_MONITOR,
"%s, %lu foreign key prefix %s is not recommended",
p_rule->table_name, p_rule->rule_id, p_foreign);
}
p_rule->f_keys[p_rule->n_foreign].key = ALLOC(char, foreign_key_size+1);
memcpy(p_rule->f_keys[p_rule->n_foreign].key, p_foreign, foreign_key_size);
p_rule->f_keys[p_rule->n_foreign].filename = get_foreign_cont_filename(p_rule->table_name, p_rule->rule_id, p_rule->f_keys[p_rule->n_foreign].key, dir);
p_rule->f_keys[p_rule->n_foreign].filename = get_foreign_cont_filename(p_rule->table_name,
p_rule->rule_id,
p_rule->f_keys[p_rule->n_foreign].key,
dir);
p_rule->n_foreign++;
}
@@ -95,7 +107,8 @@ void _get_foregin_keys(struct serial_rule *p_rule, int *foreign_columns, int n_f
}
}
int get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_list, int rule_num, struct maat *maat_instance, const char *dir)
int get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_list,
int rule_num, struct maat *maat_instance, const char *dir)
{
int rule_with_foreign_key = 0;
@@ -112,7 +125,8 @@ int get_foreign_keys_define(redisContext *ctx, struct serial_rule *rule_list, in
}
int foreign_columns[8];
int n_foreign_column = plugin_table_get_foreign_column((struct plugin_schema *)schema, foreign_columns);
int n_foreign_column = plugin_table_get_foreign_column((struct plugin_schema *)schema,
foreign_columns);
if (0 == n_foreign_column) {
continue;
}
@@ -175,9 +189,10 @@ int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int ru
redisReply* reply = NULL;
for (i = 0; i < rule_num; i++) {
snprintf(redis_cmd, sizeof(redis_cmd), "GET %s:%s,%lu", mr_key_prefix[rule_list[i].op],
rule_list[i].table_name,
rule_list[i].rule_id);
snprintf(redis_cmd, sizeof(redis_cmd),
"GET %s:%s,%lu", mr_key_prefix[rule_list[i].op],
rule_list[i].table_name,
rule_list[i].rule_id);
ret = redisAppendCommand(c, redis_cmd);
assert(ret == REDIS_OK);
}
@@ -185,9 +200,10 @@ int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int ru
for (i = 0; i < rule_num; i++) {
ret = maat_cmd_wrap_redis_get_reply(c, &reply);
if (ret == REDIS_ERR) {
log_error(logger, MODULE_REDIS_MONITOR, "Redis GET %s:%s,%lu failed, redis server error",
mr_key_prefix[rule_list[i].op],
rule_list[i].table_name, rule_list[i].rule_id);
log_error(logger, MODULE_REDIS_MONITOR,
"Redis GET %s:%s,%lu failed, redis server error",
mr_key_prefix[rule_list[i].op],
rule_list[i].table_name, rule_list[i].rule_id);
error_happened = 1;
break;
}
@@ -199,9 +215,10 @@ int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int ru
retry_ids[failed_cnt] = i;
failed_cnt++;
} else {
log_error(logger, MODULE_REDIS_MONITOR, "Redis GET %s:%s,%lu failed",
mr_key_prefix[rule_list[i].op],
rule_list[i].table_name, rule_list[i].rule_id);
log_error(logger, MODULE_REDIS_MONITOR,
"Redis GET %s:%s,%lu failed",
mr_key_prefix[rule_list[i].op],
rule_list[i].table_name, rule_list[i].rule_id);
error_happened = 1;
}
}
@@ -218,9 +235,10 @@ int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int ru
int idx = 0;
for (i = 0; i < failed_cnt; i++) {
idx = retry_ids[i];
snprintf(redis_cmd, sizeof(redis_cmd), "GET %s:%s,%lu", mr_key_prefix[MAAT_OP_DEL],
rule_list[idx].table_name,
rule_list[idx].rule_id);
snprintf(redis_cmd, sizeof(redis_cmd),
"GET %s:%s,%lu", mr_key_prefix[MAAT_OP_DEL],
rule_list[idx].table_name,
rule_list[idx].rule_id);
ret = redisAppendCommand(c, redis_cmd);
}
@@ -228,7 +246,9 @@ int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int ru
idx = retry_ids[i];
ret = maat_cmd_wrap_redis_get_reply(c, &reply);
if (ret == REDIS_ERR) {
log_error(logger, MODULE_REDIS_MONITOR, "redis command %s failed, redis server error", redis_cmd);
log_error(logger, MODULE_REDIS_MONITOR,
"redis command %s failed, redis server error",
redis_cmd);
FREE(retry_ids);
return -1;
}
@@ -237,12 +257,14 @@ int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int ru
rule_list[idx].table_line = maat_strdup(reply->str);
} else if(reply->type==REDIS_REPLY_ERROR) {
//Deal with Redis response: "Loading Redis is loading the database in memory"
log_error(logger, MODULE_REDIS_MONITOR, "redis command %s error, reply type=%d, error str=%s",
redis_cmd, reply->type, reply->str);
log_error(logger, MODULE_REDIS_MONITOR,
"redis command %s error, reply type=%d, error str=%s",
redis_cmd, reply->type, reply->str);
} else {
//Handle type "nil"
log_error(logger, MODULE_REDIS_MONITOR, "redis command %s failed, reply type=%d",
redis_cmd, reply->type);
log_error(logger, MODULE_REDIS_MONITOR,
"redis command %s failed, reply type=%d",
redis_cmd, reply->type);
}
freeReplyObject(reply);
@@ -254,8 +276,8 @@ int _get_maat_redis_value(redisContext *c, struct serial_rule *rule_list, int ru
return 0;
}
int maat_cmd_get_redis_value(redisContext *c, struct serial_rule *rule_list, int rule_num,
int print_process, struct log_handle *logger)
int maat_cmd_get_redis_value(redisContext *c, struct serial_rule *rule_list,
int rule_num, int print_process, struct log_handle *logger)
{
int max_redis_batch = 4096;
int success_cnt = 0;
@@ -286,14 +308,17 @@ int maat_cmd_get_redis_value(redisContext *c, struct serial_rule *rule_list, int
}
int get_inc_key_list(long long instance_version, long long target_version,
redisContext *c, struct serial_rule **list, struct log_handle *logger)
redisContext *c, struct serial_rule **list,
struct log_handle *logger)
{
//Returns all the elements in the sorted set at key with a score that instance_version < score <= redis_version.
//The elements are considered to be ordered from low to high scores(instance_version).
redisReply *reply = (redisReply *)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld", mr_status_sset,
instance_version,target_version);
redisReply *reply = (redisReply *)redisCommand(c, "ZRANGEBYSCORE %s (%lld %lld",
mr_status_sset, instance_version,
target_version);
if (NULL == reply) {
log_error(logger, MODULE_REDIS_MONITOR, "GET %s failed with a NULL reply, error: %s",
log_error(logger, MODULE_REDIS_MONITOR,
"GET %s failed with a NULL reply, error: %s",
mr_status_sset, c->errstr);
return -1;
}
@@ -307,10 +332,14 @@ int get_inc_key_list(long long instance_version, long long target_version,
return 0;
}
redisReply *tmp_reply= maat_cmd_wrap_redis_command(c, "ZSCORE %s %s", mr_status_sset, reply->element[0]->str);
redisReply *tmp_reply= maat_cmd_wrap_redis_command(c, "ZSCORE %s %s",
mr_status_sset,
reply->element[0]->str);
if (tmp_reply->type != REDIS_REPLY_STRING) {
log_error(logger, MODULE_REDIS_MONITOR, "ZSCORE %s %s failed Version: %lld->%lld",
mr_status_sset, reply->element[0]->str, instance_version, target_version);
log_error(logger, MODULE_REDIS_MONITOR,
"ZSCORE %s %s failed Version: %lld->%lld",
mr_status_sset, reply->element[0]->str,
instance_version, target_version);
freeReplyObject(tmp_reply);
tmp_reply = NULL;
freeReplyObject(reply);
@@ -327,7 +356,8 @@ int get_inc_key_list(long long instance_version, long long target_version,
}
if (nearest_rule_version != instance_version + 1) {
log_info(logger, MODULE_REDIS_MONITOR, "Noncontinuous VERSION Redis: %lld MAAT: %lld",
log_info(logger, MODULE_REDIS_MONITOR,
"Noncontinuous VERSION Redis: %lld MAAT: %lld",
nearest_rule_version, instance_version);
}
@@ -338,10 +368,11 @@ int get_inc_key_list(long long instance_version, long long target_version,
for (i = 0, j = 0; i < (int)reply->elements; i++) {
assert(reply->element[i]->type == REDIS_REPLY_STRING);
int ret = sscanf(reply->element[i]->str, "%[^,],%[^,],%lu", op_str,
s_rule[j].table_name, &(s_rule[j].rule_id));
int ret = sscanf(reply->element[i]->str, "%[^,],%[^,],%lu",
op_str, s_rule[j].table_name, &(s_rule[j].rule_id));
if (ret != 3 || s_rule[i].rule_id < 0) {
log_error(logger, MODULE_REDIS_MONITOR, "Invalid Redis Key: %s", reply->element[i]->str);
log_error(logger, MODULE_REDIS_MONITOR,
"Invalid Redis Key: %s", reply->element[i]->str);
continue;
}
@@ -350,7 +381,8 @@ int get_inc_key_list(long long instance_version, long long target_version,
} else if(strncmp(op_str, "DEL", strlen("DEL")) == 0) {
s_rule[j].op = MAAT_OP_DEL;
} else {
log_error(logger, MODULE_REDIS_MONITOR, "Invalid Redis Key: %s", reply->element[i]->str);
log_error(logger, MODULE_REDIS_MONITOR,
"Invalid Redis Key: %s", reply->element[i]->str);
continue;
}
j++;
@@ -398,7 +430,8 @@ struct serial_rule *serial_rule_clone(const struct serial_rule *s_rule)
new_rule->f_keys[j].key = ALLOC(char, s_rule->f_keys[j].key_len);
memcpy(new_rule->f_keys[j].key, s_rule->f_keys[j].key, s_rule->f_keys[j].key_len);
new_rule->f_keys[j].filename = ALLOC(char, strlen(s_rule->f_keys[j].filename));
memcpy(new_rule->f_keys[j].filename, s_rule->f_keys[j].filename, strlen(s_rule->f_keys[j].filename));
memcpy(new_rule->f_keys[j].filename, s_rule->f_keys[j].filename,
strlen(s_rule->f_keys[j].filename));
}
return new_rule;
@@ -466,10 +499,10 @@ int recovery_history_version(const struct serial_rule *current, int current_num,
return ret;
}
int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version, long long desired_version,
long long *new_version, struct table_manager *tbl_mgr,
struct serial_rule **list, int *update_type, int cumulative_off,
struct log_handle *logger)
int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version,
long long desired_version, long long *new_version,
struct table_manager *tbl_mgr, struct serial_rule **list,
int *update_type, int cumulative_off, struct log_handle *logger)
{
int rule_num = 0;
long long target_version = 0;
@@ -478,20 +511,23 @@ int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version, long l
redisReply *reply = (redisReply *)redisCommand(c, "GET MAAT_VERSION");
if (reply != NULL) {
if (reply->type == REDIS_REPLY_NIL || reply->type == REDIS_REPLY_ERROR) {
log_error(logger, MODULE_REDIS_MONITOR, "GET MAAT_VERSION failed, maybe Redis is busy");
log_error(logger, MODULE_REDIS_MONITOR,
"GET MAAT_VERSION failed, maybe Redis is busy");
freeReplyObject(reply);
reply = NULL;
return -1;
}
} else {
log_error(logger, MODULE_REDIS_MONITOR, "GET MAAT_VERSION failed with NULL reply, error: %s", c->errstr);
log_error(logger, MODULE_REDIS_MONITOR,
"GET MAAT_VERSION failed with NULL reply, error: %s", c->errstr);
return -1;
}
long long redis_version = maat_cmd_read_redis_integer(reply);
if (redis_version < 0) {
if (reply->type == REDIS_REPLY_ERROR) {
log_error(logger, MODULE_REDIS_MONITOR, "Redis Communication error: %s", reply->str);
log_error(logger, MODULE_REDIS_MONITOR,
"Redis Communication error: %s", reply->str);
}
return -1;
}
@@ -508,7 +544,8 @@ int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version, long l
}
if (redis_version < instance_version) {
log_error(logger, MODULE_REDIS_MONITOR, "VERSION roll back MAAT: %lld -> Redis: %lld",
log_error(logger, MODULE_REDIS_MONITOR,
"VERSION roll back MAAT: %lld -> Redis: %lld",
instance_version, redis_version);
goto FULL_UPDATE;
}
@@ -521,7 +558,8 @@ int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version, long l
do {
target_version++;
rule_num = get_inc_key_list(instance_version, target_version, c, &s_rule_array, logger);
rule_num = get_inc_key_list(instance_version, target_version,
c, &s_rule_array, logger);
if (rule_num > 0) {
break;
} else if (rule_num < 0) {
@@ -533,21 +571,25 @@ int maat_cmd_get_rm_key_list(redisContext *c, long long instance_version, long l
} while (0 == rule_num && target_version <= redis_version && 1 == cumulative_off);
if (0 == rule_num) {
log_info(logger, MODULE_REDIS_MONITOR, "Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s",
mr_status_sset, instance_version, target_version-1, cumulative_off == 1 ? "OFF" : "ON");
log_info(logger, MODULE_REDIS_MONITOR,
"Got nothing after ZRANGEBYSCORE %s (%lld %lld, cumulative %s",
mr_status_sset, instance_version, target_version-1,
cumulative_off == 1 ? "OFF" : "ON");
return 0;
}
log_info(logger, MODULE_REDIS_MONITOR, "Inc Update from instance_version %lld to %lld (%d entries)",
log_info(logger, MODULE_REDIS_MONITOR,
"Inc Update from instance_version %lld to %lld (%d entries)",
instance_version, target_version, rule_num);
*list = s_rule_array;
*update_type = CM_UPDATE_TYPE_INC;
*update_type = MAAT_UPDATE_TYPE_INC;
*new_version = target_version;
return rule_num;
FULL_UPDATE:
log_info(logger, MODULE_REDIS_MONITOR, "Initiate full update from instance_version %lld to %lld",
log_info(logger, MODULE_REDIS_MONITOR,
"Initiate full update from instance_version %lld to %lld",
instance_version, desired_version == 0 ? redis_version : desired_version);
size_t append_cmd_cnt = 0;
int ret = redisAppendCommand(c, "MULTI");
@@ -569,12 +611,14 @@ FULL_UPDATE:
reply = maat_cmd_wrap_redis_command(c, "EXEC");
if (NULL == reply) {
log_error(logger, MODULE_REDIS_MONITOR, "Redis Communication error: %s", c->errstr);
log_error(logger, MODULE_REDIS_MONITOR,
"Redis Communication error: %s", c->errstr);
return -1;
}
if (reply->type != REDIS_REPLY_ARRAY) {
log_error(logger, MODULE_REDIS_MONITOR, "Invalid Redis Key List type %d", reply->type);
log_error(logger, MODULE_REDIS_MONITOR,
"Invalid Redis Key List type %d", reply->type);
freeReplyObject(reply);
reply = NULL;
return -1;
@@ -583,7 +627,8 @@ FULL_UPDATE:
*new_version = maat_cmd_read_redis_integer(reply->element[0]);
redisReply *sub_reply = reply->element[1];
if (sub_reply->type != REDIS_REPLY_ARRAY) {
log_error(logger, MODULE_REDIS_MONITOR, "Invalid Redis Key List type %d", sub_reply->type);
log_error(logger, MODULE_REDIS_MONITOR,
"Invalid Redis Key List type %d", sub_reply->type);
freeReplyObject(reply);
reply = NULL;
return -1;
@@ -593,17 +638,21 @@ FULL_UPDATE:
s_rule_array = ALLOC(struct serial_rule, sub_reply->elements);
for (i = 0, full_idx = 0; i < sub_reply->elements; i++) {
if (sub_reply->element[i]->type != REDIS_REPLY_STRING) {
log_error(logger, MODULE_REDIS_MONITOR, "Invalid Redis Key Type: %d", sub_reply->element[i]->type);
log_error(logger, MODULE_REDIS_MONITOR,
"Invalid Redis Key Type: %d", sub_reply->element[i]->type);
continue;
}
ret = sscanf(sub_reply->element[i]->str, "%*[^:]:%[^,],%ld",
s_rule_array[full_idx].table_name,
&(s_rule_array[full_idx].rule_id));
s_rule_array[full_idx].table_name,
&(s_rule_array[full_idx].rule_id));
s_rule_array[full_idx].op = MAAT_OP_ADD;
if (ret != 2 || s_rule_array[full_idx].rule_id < 0 || strlen(s_rule_array[full_idx].table_name) == 0) {
log_error(logger, MODULE_REDIS_MONITOR, "Invalid Redis Key Format: %s", sub_reply->element[i]->str);
if (ret != 2 || s_rule_array[full_idx].rule_id < 0 ||
strlen(s_rule_array[full_idx].table_name) == 0) {
log_error(logger, MODULE_REDIS_MONITOR,
"Invalid Redis Key Format: %s",
sub_reply->element[i]->str);
continue;
}
@@ -623,22 +672,27 @@ FULL_UPDATE:
if (desired_version != 0) {
struct serial_rule *changed_rule_array = NULL;
int changed_rule_num = get_inc_key_list(desired_version, redis_version, c, &changed_rule_array, logger);
int changed_rule_num = get_inc_key_list(desired_version, redis_version,
c, &changed_rule_array, logger);
if (changed_rule_num < 0) {
log_error(logger, MODULE_REDIS_MONITOR, "Recover history version %lld faild where as redis version is %lld",
log_error(logger, MODULE_REDIS_MONITOR,
"Recover history version %lld faild where as redis version is %lld",
desired_version, redis_version);
} else if(0 == changed_rule_num) {
log_error(logger, MODULE_REDIS_MONITOR, "Nothing to recover from history version %lld to redis version is %lld",
log_error(logger, MODULE_REDIS_MONITOR,
"Nothing to recover from history version %lld to redis version is %lld",
desired_version, redis_version);
} else {
struct serial_rule *history_rule_array = NULL;
ret = recovery_history_version(s_rule_array, full_idx, changed_rule_array, changed_rule_num, &history_rule_array);
ret = recovery_history_version(s_rule_array, full_idx, changed_rule_array,
changed_rule_num, &history_rule_array);
if (ret > 0) {
FREE(s_rule_array);
s_rule_array = history_rule_array;
rule_num = ret;
*new_version = desired_version;
log_info(logger, MODULE_REDIS_MONITOR, "Successfully recovered from history version %lld to redis version is %lld",
log_info(logger, MODULE_REDIS_MONITOR,
"Successfully recovered from history version %lld to redis version is %lld",
desired_version, redis_version);
}
}
@@ -646,14 +700,15 @@ FULL_UPDATE:
}
*list = s_rule_array;
*update_type = CM_UPDATE_TYPE_FULL;
log_info(logger, MODULE_REDIS_MONITOR, "Full update %d keys of version %lld", rule_num, *new_version);
*update_type = MAAT_UPDATE_TYPE_FULL;
log_info(logger, MODULE_REDIS_MONITOR,
"Full update %d keys of version %lld", rule_num, *new_version);
return rule_num ;
}
void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, int rule_num, int print_fn,
struct log_handle *logger)
void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
int rule_num, int print_fn, struct log_handle *logger)
{
int i = 0;
int j = 0;
@@ -676,8 +731,9 @@ void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, int rule
ret = remove(rule_list[i].f_keys[j].filename);
if (ret == -1) {
log_error(logger, MODULE_REDIS_MONITOR, "Foreign content file %s remove failed",
rule_list[i].f_keys[j].filename);
log_error(logger, MODULE_REDIS_MONITOR,
"Foreign content file %s remove failed",
rule_list[i].f_keys[j].filename);
}
}
} else {
@@ -734,7 +790,8 @@ void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, int rule
fclose(fp);
fp = NULL;
if (1 == print_fn) {
printf("Written foreign content %s\n", s_rule->f_keys[track[i].foreign_idx].filename);
printf("Written foreign content %s\n",
s_rule->f_keys[track[i].foreign_idx].filename);
}
}
}
@@ -747,8 +804,8 @@ void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list, int rule
return;
}
void maat_cmd_get_foreign_conts(redisContext *c, struct serial_rule *rule_list, int rule_num, int print_fn,
struct log_handle *logger)
void maat_cmd_get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
int rule_num, int print_fn, struct log_handle *logger)
{
int max_redis_batch = 4096;
int success_cnt = 0;
@@ -797,12 +854,14 @@ void maat_cmd_rewrite_table_line_with_foreign(struct serial_rule *s_rule)
pos_rewrite_line += strlen(s_rule->f_keys[i].filename);
}
strncat(pos_rewrite_line, pos_origin_line, strlen(s_rule->table_line) - (pos_origin_line - s_rule->table_line));
strncat(pos_rewrite_line, pos_origin_line,
strlen(s_rule->table_line) - (pos_origin_line - s_rule->table_line));
FREE(s_rule->table_line);
s_rule->table_line = rewrite_line;
}
void expected_reply_add(struct expected_reply* expected, int s_rule_seq, int type, long long integer)
void expected_reply_add(struct expected_reply* expected, int s_rule_seq,
int type, long long integer)
{
int i = expected->possible_reply_num;
assert(i < POSSIBLE_REDIS_REPLY_SIZE);
@@ -816,7 +875,8 @@ int redlock_try_lock(redisContext *c, const char *lock_name, long long expire)
{
int ret = 0;
redisReply *reply = maat_cmd_wrap_redis_command(c, "SET %s locked NX PX %lld", lock_name, expire);
redisReply *reply = maat_cmd_wrap_redis_command(c, "SET %s locked NX PX %lld",
lock_name, expire);
if (reply->type == REDIS_REPLY_NIL) {
ret = 0;
} else {
@@ -829,8 +889,9 @@ int redlock_try_lock(redisContext *c, const char *lock_name, long long expire)
return ret;
}
long long exec_serial_rule_begin(redisContext* c, size_t rule_num, size_t renew_rule_num,
int *renew_allowed, long long *transaction_version)
long long exec_serial_rule_begin(redisContext* c, size_t rule_num,
size_t renew_rule_num, int *renew_allowed,
long long *transaction_version)
{
int ret = -1;
redisReply *data_reply = NULL;
@@ -878,8 +939,9 @@ const char* lua_exec_done=
"redis.call(\'del\', KEYS[4]);"
"redis.call(\'zadd\', KEYS[3], ARGV[1], maat_version);"
"return maat_version;";
redisReply* exec_serial_rule_end(redisContext *c, const char *transaction_list, long long server_time,
int renew_allowed, struct expected_reply *expect_reply, size_t *cnt)
redisReply* exec_serial_rule_end(redisContext *c, const char *transaction_list,
long long server_time, int renew_allowed,
struct expected_reply *expect_reply, size_t *cnt)
{
redisReply *data_reply = NULL;
@@ -907,8 +969,10 @@ redisReply* exec_serial_rule_end(redisContext *c, const char *transaction_list,
return data_reply;
}
void exec_serial_rule(redisContext *c, const char *transaction_list, struct serial_rule *s_rule, size_t rule_num,
struct expected_reply *expect_reply, size_t *cnt, size_t offset, int renew_allowed)
void exec_serial_rule(redisContext *c, const char *transaction_list,
struct serial_rule *s_rule, size_t rule_num,
struct expected_reply *expect_reply, size_t *cnt,
size_t offset, int renew_allowed)
{
size_t i = 0;
size_t append_cmd_cnt = 0;
@@ -1053,8 +1117,9 @@ int mr_operation_success(redisReply *actual_reply, struct expected_reply *expect
return 0;
}
int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule, size_t serial_rule_num,
long long server_time, struct log_handle *logger)
int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
size_t serial_rule_num, long long server_time,
struct log_handle *logger)
{
size_t i = 0;
size_t rule_seq = 0;
@@ -1079,7 +1144,8 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule, size_t seri
}
}
int ret = exec_serial_rule_begin(c, serial_rule_num, renew_num, &renew_allowed, &transaction_version);
int ret = exec_serial_rule_begin(c, serial_rule_num, renew_num, &renew_allowed,
&transaction_version);
//Preconditions for transaction are not satisfied.
if (ret != 0) {
success_cnt = -1;
@@ -1087,18 +1153,20 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule, size_t seri
}
if (transaction_version > 0) {
snprintf(transaction_list, sizeof(transaction_list), "MAAT_TRANSACTION_%lld", transaction_version);
snprintf(transaction_list, sizeof(transaction_list), "MAAT_TRANSACTION_%lld",
transaction_version);
}
while (success_cnt < serial_rule_num) {
size_t batch_cnt = MIN(serial_rule_num - success_cnt, max_redis_batch);
exec_serial_rule(c, transaction_list, s_rule + success_cnt, batch_cnt, expected_reply, &multi_cmd_cnt,
success_cnt, renew_allowed);
exec_serial_rule(c, transaction_list, s_rule + success_cnt, batch_cnt,
expected_reply, &multi_cmd_cnt, success_cnt, renew_allowed);
assert(multi_cmd_cnt<max_multi_cmd_num);
success_cnt+=batch_cnt;
}
transaction_reply = exec_serial_rule_end(c, transaction_list, server_time, renew_allowed, expected_reply, &multi_cmd_cnt);
transaction_reply = exec_serial_rule_end(c, transaction_list, server_time, renew_allowed,
expected_reply, &multi_cmd_cnt);
if (1 == mr_transaction_success(transaction_reply)) {
assert(transaction_reply->elements == multi_cmd_cnt);
for (i = 0; i < multi_cmd_cnt; i++) {
@@ -1138,8 +1206,10 @@ error_out:
if (renew_num > 0 && renew_allowed != 1) {
for (i = 0; i < (unsigned int)serial_rule_num; i++) {
if (s_rule[i].op == MAAT_OP_RENEW_TIMEOUT) {
log_error(logger, MODULE_REDIS_MONITOR, "%s %s %lu is not allowed due to lock contention",
mr_op_str[MAAT_OP_RENEW_TIMEOUT], s_rule[i].table_name, s_rule[i].rule_id);
log_error(logger, MODULE_REDIS_MONITOR,
"%s %s %lu is not allowed due to lock contention",
mr_op_str[MAAT_OP_RENEW_TIMEOUT], s_rule[i].table_name,
s_rule[i].rule_id);
}
}
@@ -1171,11 +1241,11 @@ void cleanup_update_status(redisContext *c, struct log_handle *logger)
int append_cmd_cnt = 0;
redisAppendCommand(c, "ZRANGEBYSCORE %s -inf %lld",
mr_version_sset, server_time - MAAT_REDIS_SYNC_TIME);
mr_version_sset, server_time - MAAT_REDIS_SYNC_TIME);
append_cmd_cnt++;
redisAppendCommand(c, "ZREMRANGEBYSCORE %s -inf %lld",
mr_version_sset,server_time - MAAT_REDIS_SYNC_TIME);
mr_version_sset,server_time - MAAT_REDIS_SYNC_TIME);
append_cmd_cnt++;
//consume reply "OK" and "QUEUED".
@@ -1207,8 +1277,9 @@ void cleanup_update_status(redisContext *c, struct log_handle *logger)
reply = NULL;
//To deal with maat_version reset to 0, do NOT use -inf as lower bound intentionally.
reply = maat_cmd_wrap_redis_command(c, "ZREMRANGEBYSCORE %s %lld %lld", mr_status_sset,
version_lower_bound, version_upper_bound);
reply = maat_cmd_wrap_redis_command(c, "ZREMRANGEBYSCORE %s %lld %lld",
mr_status_sset, version_lower_bound,
version_upper_bound);
entry_num = maat_cmd_read_redis_integer(reply);
freeReplyObject(reply);
reply = NULL;
@@ -1232,7 +1303,8 @@ void check_maat_expiration(redisContext *c, struct log_handle *logger)
return;
}
redisReply *data_reply= maat_cmd_wrap_redis_command(c, "ZRANGEBYSCORE %s -inf %lld", mr_expire_sset, server_time);
redisReply *data_reply= maat_cmd_wrap_redis_command(c, "ZRANGEBYSCORE %s -inf %lld",
mr_expire_sset, server_time);
if (data_reply->type != REDIS_REPLY_ARRAY || 0 == data_reply->elements) {
freeReplyObject(data_reply);
data_reply = NULL;
@@ -1244,7 +1316,8 @@ void check_maat_expiration(redisContext *c, struct log_handle *logger)
for (size_t i = 0; i < s_rule_num; i++) {
s_rule[i].op = MAAT_OP_DEL;
ret = sscanf(data_reply->element[i]->str, "%[^,],%ld", s_rule[i].table_name, &(s_rule[i].rule_id));
ret = sscanf(data_reply->element[i]->str, "%[^,],%ld",
s_rule[i].table_name, &(s_rule[i].rule_id));
assert(ret == 2);
}
freeReplyObject(data_reply);
@@ -1254,9 +1327,11 @@ void check_maat_expiration(redisContext *c, struct log_handle *logger)
if (success_cnt < 0) {
log_error(logger, MODULE_REDIS_MONITOR, "maat_cmd_write_rule failed.");
} else if (success_cnt == (int)s_rule_num) {
log_info(logger, MODULE_REDIS_MONITOR, "Succesfully expired %zu rules in Redis", s_rule_num);
log_info(logger, MODULE_REDIS_MONITOR,
"Succesfully expired %zu rules in Redis", s_rule_num);
} else {
log_error(logger, MODULE_REDIS_MONITOR, "Failed to expired %d of %zu rules in Redis, try later",
log_error(logger, MODULE_REDIS_MONITOR,
"Failed to expired %d of %zu rules in Redis, try later",
s_rule_num - success_cnt, s_rule_num);
}
@@ -1301,7 +1376,9 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
}
log_info(maat_instance->logger, MODULE_REDIS_MONITOR, "Reconnecting...");
mr_ctx->read_ctx = maat_cmd_connect_redis(mr_ctx->redis_ip, mr_ctx->redis_port, mr_ctx->redis_db,
mr_ctx->read_ctx = maat_cmd_connect_redis(mr_ctx->redis_ip,
mr_ctx->redis_port,
mr_ctx->redis_db,
maat_instance->logger);
if (NULL == mr_ctx->read_ctx) {
return;
@@ -1312,11 +1389,13 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
struct serial_rule *rule_list = NULL;
long long new_version = 0;
int update_type = CM_UPDATE_TYPE_INC;
int update_type = MAAT_UPDATE_TYPE_INC;
int rule_num = maat_cmd_get_rm_key_list(mr_ctx->read_ctx, version, maat_instance->load_specific_version,
&new_version, maat_instance->tbl_mgr, &rule_list,
&update_type, maat_instance->cumulative_update_off,
int rule_num = maat_cmd_get_rm_key_list(mr_ctx->read_ctx, version,
maat_instance->load_specific_version,
&new_version, maat_instance->tbl_mgr,
&rule_list, &update_type,
maat_instance->cumulative_update_off,
maat_instance->logger);
//redis communication error
if (rule_num < 0) {
@@ -1327,12 +1406,13 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
maat_instance->load_specific_version = 0;//only valid for one time.
//error or nothing changed
if (0 == rule_num && update_type == CM_UPDATE_TYPE_INC) {
if (0 == rule_num && update_type == MAAT_UPDATE_TYPE_INC) {
return;
}
if (rule_num > 0) {
ret = maat_cmd_get_redis_value(mr_ctx->read_ctx, rule_list, rule_num, 0, maat_instance->logger);
ret = maat_cmd_get_redis_value(mr_ctx->read_ctx, rule_list, rule_num,
0, maat_instance->logger);
//redis communication error
if (ret < 0) {
redisFree(mr_ctx->read_ctx);
@@ -1354,15 +1434,19 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
goto clean_up;
}
ret = get_foreign_keys_define(mr_ctx->read_ctx, rule_list, rule_num, maat_instance, maat_instance->foreign_cont_dir);
ret = get_foreign_keys_define(mr_ctx->read_ctx, rule_list, rule_num,
maat_instance, maat_instance->foreign_cont_dir);
if (ret > 0) {
maat_cmd_get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0, maat_instance->logger);
maat_cmd_get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0,
maat_instance->logger);
}
}
start_fn(new_version, update_type, u_param);
log_info(maat_instance->logger, MODULE_REDIS_MONITOR, "Start %s update: %lld -> %lld (%d entries)",
update_type==CM_UPDATE_TYPE_INC?"INC":"FULL", version, new_version, rule_num);
log_info(maat_instance->logger, MODULE_REDIS_MONITOR,
"Start %s update: %lld -> %lld (%d entries)",
update_type == MAAT_UPDATE_TYPE_INC ? "INC" : "FULL",
version, new_version, rule_num);
for (i = 0; i < rule_num; i++) {
if (NULL == rule_list[i].table_line) {
@@ -1385,7 +1469,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
ret = invalidate_line(rule_list[i].table_line, table_type, valid_column);
if (ret < 0) {
log_error(maat_instance->logger, MODULE_REDIS_MONITOR,
"Invalidate line failed, invaid format %s", rule_list[i].table_line);
"Invalidate line failed, invaid format %s",
rule_list[i].table_line);
continue;
}
}