[FEATURE]Refactor NOT clause, NOTE:forward incompatibility!!!

This commit is contained in:
刘文坛
2023-10-18 03:32:53 +00:00
parent 48af7e7aac
commit 613b5b3dcf
19 changed files with 1609 additions and 498 deletions

View File

@@ -43,11 +43,11 @@ const char *foreign_key_prefix = "__FILE_";
const char *mr_op_str[] = {"DEL", "ADD", "RENEW_TIMEOUT"};
#define POSSIBLE_REDIS_REPLY_SIZE 2
#define REDIS_REPLY_SIZE 2
struct expected_reply {
int s_rule_seq;
int possible_reply_num;
redisReply possible_replies[POSSIBLE_REDIS_REPLY_SIZE];
redisReply possible_replies[REDIS_REPLY_SIZE];
};
static char *get_foreign_cont_filename(const char *table_name, long long rule_id,
@@ -867,7 +867,7 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
log_error(logger, MODULE_REDIS_MONITOR,
"[%s:%d] Get %s,%lld foreign key %s content failed, redis server error",
__FUNCTION__, __LINE__,
rule_list[track[i].rule_idx].table_name,
rule_list[track[i].rule_idx].table_name,
rule_list[track[i].rule_idx].rule_id,
rule_list[track[i].rule_idx].f_keys[track[i].foreign_idx].key);
break;
@@ -886,15 +886,15 @@ static void _get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
FILE *fp = fopen(s_rule->f_keys[track[i].foreign_idx].filename, "w");
if (NULL == fp) {
log_error(logger, MODULE_REDIS_MONITOR,
"[%s:%d] Write foreign content failed: fopen %s error",
__FUNCTION__, __LINE__, s_rule->f_keys[track[i].foreign_idx].filename);
"[%s:%d] Write foreign content failed: fopen %s error", __FUNCTION__,
__LINE__, s_rule->f_keys[track[i].foreign_idx].filename);
} else {
fwrite(reply->str, 1, reply->len, fp);
fclose(fp);
fp = NULL;
if (1 == print_fn) {
printf("[%s:%d] Written foreign content %s\n",
__FUNCTION__, __LINE__, s_rule->f_keys[track[i].foreign_idx].filename);
printf("[%s:%d] Written foreign content %s\n", __FUNCTION__,
__LINE__, s_rule->f_keys[track[i].foreign_idx].filename);
}
}
}
@@ -920,7 +920,7 @@ void maat_get_foreign_conts(redisContext *c, struct serial_rule *rule_list,
}
}
static int invalidate_line(char *line, int column_seq)
static int validate_line(char *line, int column_seq)
{
if (NULL == line || column_seq < 0) {
return -1;
@@ -957,7 +957,8 @@ void maat_rewrite_table_line_with_foreign(struct serial_rule *s_rule)
strncat(pos_rewrite_line, pos_origin_line, origin_column - pos_origin_line);
pos_rewrite_line += origin_column - pos_origin_line;
pos_origin_line = origin_column+origin_column_size;
strncat(pos_rewrite_line, s_rule->f_keys[i].filename, strlen(s_rule->f_keys[i].filename));
strncat(pos_rewrite_line, s_rule->f_keys[i].filename,
strlen(s_rule->f_keys[i].filename));
pos_rewrite_line += strlen(s_rule->f_keys[i].filename);
}
@@ -971,7 +972,7 @@ static void expected_reply_add(struct expected_reply* expected, int s_rule_seq,
int type, long long integer)
{
int i = expected->possible_reply_num;
assert(i < POSSIBLE_REDIS_REPLY_SIZE);
assert(i < REDIS_REPLY_SIZE);
expected->s_rule_seq = s_rule_seq;
expected->possible_replies[i].type = type;
expected->possible_replies[i].integer = integer;
@@ -1049,7 +1050,7 @@ const char* lua_exec_done=
"return maat_version;";
static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction_list,
long long server_time, int renew_allowed,
struct expected_reply *expect_reply, size_t *cnt)
struct expected_reply *expect_reply, size_t *cnt)
{
redisReply *data_reply = NULL;
@@ -1061,11 +1062,8 @@ static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction
if (strlen(transaction_list) > 0) {
data_reply = maat_wrap_redis_command(c, "eval %s 4 MAAT_VERSION %s %s %s %lld",
lua_exec_done,
mr_status_sset,
mr_version_sset,
transaction_list,
server_time);
lua_exec_done, mr_status_sset, mr_version_sset,
transaction_list, server_time);
freeReplyObject(data_reply);
data_reply = NULL;
expected_reply_add(expect_reply + *cnt, -1, REDIS_REPLY_INTEGER, 0);
@@ -1077,10 +1075,10 @@ static redisReply* exec_serial_rule_end(redisContext *c, const char *transaction
return data_reply;
}
static void exec_serial_rule(redisContext *c, const char *transaction_list,
struct serial_rule *s_rule, size_t rule_num,
struct expected_reply *expect_reply, size_t *cnt,
size_t offset, int renew_allowed)
static void exec_serial_rule(redisContext *c, const char *transaction_list,
struct serial_rule *s_rule, size_t rule_num,
struct expected_reply *expect_reply, size_t *cnt,
size_t offset, int renew_allowed)
{
size_t i = 0;
size_t append_cmd_cnt = 0;
@@ -1235,7 +1233,7 @@ int maat_cmd_write_rule(redisContext *c, struct serial_rule *s_rule,
char transaction_list[NAME_MAX * 2] = {0};
long long transaction_version = 0;
long long transaction_finished_version = 0;
size_t max_multi_cmd_num = MAX_REDIS_OP_PER_SRULE * serial_rule_num + 2;// 2 for operation in exec_serial_rule_end()
size_t max_multi_cmd_num = MAX_REDIS_OP_PER_SRULE * serial_rule_num + 2; // 2 for operation in exec_serial_rule_end()
struct expected_reply *expected_reply = ALLOC(struct expected_reply, max_multi_cmd_num);
for (i = 0; i < serial_rule_num; i++) {
@@ -1456,7 +1454,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
//authorized to write
if (mr_ctx->write_ctx != NULL && mr_ctx->write_ctx->err == 0) {
//For thread safe, deliberately use redis_read_ctx but not redis_write_ctx.
if (1 == redlock_try_lock(mr_ctx->read_ctx, mr_expire_lock, mr_expire_lock_timeout_ms)) {
if (1 == redlock_try_lock(mr_ctx->read_ctx, mr_expire_lock,
mr_expire_lock_timeout_ms)) {
check_maat_expiration(mr_ctx->read_ctx, maat_inst->logger);
cleanup_update_status(mr_ctx->read_ctx, maat_inst->logger);
redlock_unlock(mr_ctx->read_ctx, mr_expire_lock);
@@ -1464,7 +1463,8 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
}
if (NULL == mr_ctx->read_ctx || mr_ctx->read_ctx->err) {
if (time(NULL) - mr_ctx->last_reconnect_time < MAAT_REDIS_RECONNECT_INTERVAL_S) {
if ((time(NULL) - mr_ctx->last_reconnect_time) <
MAAT_REDIS_RECONNECT_INTERVAL_S) {
return;
}
@@ -1476,9 +1476,9 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
log_info(maat_inst->logger, MODULE_REDIS_MONITOR, "Reconnecting...");
mr_ctx->read_ctx = maat_connect_redis(mr_ctx->redis_ip,
mr_ctx->redis_port,
mr_ctx->redis_db,
maat_inst->logger);
mr_ctx->redis_port,
mr_ctx->redis_db,
maat_inst->logger);
if (NULL == mr_ctx->read_ctx) {
return;
} else {
@@ -1491,11 +1491,11 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
int update_type = MAAT_UPDATE_TYPE_INC;
int rule_num = maat_get_rm_key_list(mr_ctx->read_ctx, version,
maat_inst->load_specific_version,
&new_version, maat_inst->tbl_mgr,
&rule_list, &update_type,
maat_inst->opts.cumulative_update_off,
maat_inst->logger);
maat_inst->load_specific_version,
&new_version, maat_inst->tbl_mgr,
&rule_list, &update_type,
maat_inst->opts.cumulative_update_off,
maat_inst->logger);
//redis communication error
if (rule_num < 0) {
redisFree(mr_ctx->read_ctx);
@@ -1538,7 +1538,7 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
maat_inst, maat_inst->opts.foreign_cont_dir);
if (ret > 0) {
maat_get_foreign_conts(mr_ctx->read_ctx, rule_list, rule_num, 0,
maat_inst->logger);
maat_inst->logger);
}
}
@@ -1562,10 +1562,10 @@ void redis_monitor_traverse(long long version, struct source_redis_ctx *mr_ctx,
if (rule_list[i].op == MAAT_OP_DEL) {
valid_column = table_manager_get_valid_column(maat_inst->tbl_mgr, table_id);
ret = invalidate_line(rule_list[i].table_line, valid_column);
ret = validate_line(rule_list[i].table_line, valid_column);
if (ret < 0) {
log_error(maat_inst->logger, MODULE_REDIS_MONITOR,
"[%s:%d] Invalidate line failed, invaid format %s",
"[%s:%d] Validate line failed, invaid format %s",
__FUNCTION__, __LINE__, rule_list[i].table_line);
continue;
}