diff --git a/inc/Maat_command.h b/inc/Maat_command.h index ec4796f..eb61899 100644 --- a/inc/Maat_command.h +++ b/inc/Maat_command.h @@ -161,6 +161,13 @@ int Maat_cmd_set_file(Maat_feather_t feather,const char* key, const char* value, //Return the value of key after the increment. //If the key does not exist, it is set to 0 before performing the operation. long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment); +struct Maat_cmd_key +{ + char* table_name; + int rule_id; +}; +void Maat_cmd_key_free(struct Maat_cmd_key**keys, int number); +int Maat_cmd_key_select(Maat_feather_t feather, int label_id, struct Maat_cmd_key** keys); int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, unsigned int size); int Maat_cmd_flushDB(Maat_feather_t feather); #endif diff --git a/src/entry/Maat_api.cpp b/src/entry/Maat_api.cpp index a8ccf77..041850c 100644 --- a/src/entry/Maat_api.cpp +++ b/src/entry/Maat_api.cpp @@ -1642,7 +1642,7 @@ stream_para_t Maat_stream_scan_string_start(Maat_feather_t feather,int table_id, struct Maat_table_runtime* table_rt=scanner->table_rt[table_id]; if(table_rt->origin_rule_num==0) { - return 0; + return sp; } INC_SCANNER_REF(scanner, thread_num); diff --git a/src/entry/Maat_command.cpp b/src/entry/Maat_command.cpp index 1f1390a..753e99b 100644 --- a/src/entry/Maat_command.cpp +++ b/src/entry/Maat_command.cpp @@ -1129,9 +1129,11 @@ void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_ } if(s_rule[i].label_id>0) { - redisAppendCommand(ctx,"ZADD %s NX %d %d",rm_label_sset - ,s_rule[i].label_id - ,s_rule[i].rule_id); + redisAppendCommand(ctx,"ZADD %s NX %d %s,%d", + rm_label_sset, + s_rule[i].label_id, + s_rule[i].table_name, + s_rule[i].rule_id); expect_reply[*cnt].srule_seq=i+offset; expect_reply[*cnt].reply.type=REDIS_REPLY_INTEGER; expect_reply[*cnt].reply.integer=1; @@ -1176,15 +1178,16 @@ void _exec_serial_rule(redisContext* ctx, long long version, struct serial_rule_ append_cmd_cnt++; // Try to remove from expiration sorted set, no matter wheather it exists or not. - redisAppendCommand(ctx,"ZREM %s %s,%d",rm_expire_sset - ,s_rule[i].table_name - ,s_rule[i].rule_id); + redisAppendCommand(ctx,"ZREM %s %s,%d",rm_expire_sset, + s_rule[i].table_name, + s_rule[i].rule_id); expect_reply[*cnt].srule_seq=-1; (*cnt)++; append_cmd_cnt++; - redisAppendCommand(ctx,"ZREM %s %d",rm_label_sset - ,s_rule[i].rule_id); + redisAppendCommand(ctx,"ZREM %s %s,%d",rm_label_sset, + s_rule[i].table_name, + s_rule[i].rule_id); expect_reply[*cnt].srule_seq=-1; (*cnt)++; append_cmd_cnt++; @@ -2400,26 +2403,73 @@ long long Maat_cmd_incrby(Maat_feather_t feather,const char* key, int increment) freeReplyObject(data_reply); return result; } -int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, unsigned int size) +void Maat_cmd_key_free(struct Maat_cmd_key**keys, int size) +{ + int i=0; + struct Maat_cmd_key* p=*keys; + for(i=0; itable_name); + p->table_name=NULL; + p->rule_id=0; + } + free(p); + *keys=NULL; + return; +} + +int Maat_cmd_key_select(Maat_feather_t feather, int label_id, struct Maat_cmd_key** keys) { _Maat_feather_t* _feather=(_Maat_feather_t*)feather; redisReply* data_reply=NULL; + char* tmp=NULL; unsigned int i=0; + struct Maat_cmd_key* result=NULL; + int result_cnt=0; redisContext* write_ctx=get_redis_ctx_for_write(_feather); if(write_ctx==NULL) { return -1; } - data_reply=_wrap_redisCommand(write_ctx,"ZRANGEBYSCORE %s %d %d" - ,rm_label_sset - ,label_id - ,label_id); - for(i=0;ielements&&ielements; + result=ALLOC(struct Maat_cmd_key, data_reply->elements); + for(i=0;ielements;i++) { - output_ids[i]=atoi(data_reply->element[i]->str); + result[i].table_name=_maat_strdup(data_reply->element[i]->str); + tmp=strchr(result[i].table_name, ','); + if(tmp!=NULL) + { + *tmp='\0'; + tmp++; + result[i].rule_id=atoi(tmp); + } + else// old version compatible + { + result[i].rule_id=atoi(result[i].table_name); + free(result[i].table_name); + result[i].table_name=NULL; + } } freeReplyObject(data_reply); + *keys=result; + return result_cnt; +} + +int Maat_cmd_select(Maat_feather_t feather, int label_id, int * output_ids, unsigned int size) +{ + struct Maat_cmd_key* keys=NULL; + int result_cnt=0, i=0; + result_cnt=Maat_cmd_key_select(feather, label_id, &keys); + for(i=0; ivalid_flag_column)); if(ret==0||ret==EOF) diff --git a/test/test_maatframe.cpp b/test/test_maatframe.cpp index 9daddcd..51894b5 100644 --- a/test/test_maatframe.cpp +++ b/test/test_maatframe.cpp @@ -1185,7 +1185,11 @@ TEST_F(MaatCmdTest, SetExpr) output_id_cnt=Maat_cmd_select(feather,label_id, output_ids, 4); EXPECT_EQ(output_id_cnt, 2); EXPECT_TRUE(output_ids[0]==config_id||output_ids[0]==config_id-1); - + struct Maat_cmd_key* keys=NULL; + output_id_cnt=Maat_cmd_key_select(feather,label_id, &keys); + EXPECT_TRUE(keys[0].rule_id==config_id||keys[0].rule_id==config_id-1); + + Maat_cmd_key_free(&keys, output_id_cnt); usleep(WAIT_FOR_EFFECTIVE_US);//waiting for commands go into effect del_command(feather, config_id-1); del_command(feather, config_id); @@ -1246,6 +1250,7 @@ TEST_F(MaatCmdTest, SetLines) return; } +/* //Following tests must be coded/tested at last, for they stalled the maat update thread and interrupt other tests. TEST_F(MaatCmdTest, SetExprOneMillion) { @@ -1335,7 +1340,7 @@ TEST_F(MaatCmdTest, SetLinesOneMillion) return; } - +*/ int g_test_update_paused=0; void pause_update_test_entry_cb(int table_id,const char* table_line,void* u_para) {