[PATCH]add expr_matcher hit pattern statistics

This commit is contained in:
liuwentan
2023-12-27 12:04:15 +08:00
parent 102c8ac0f8
commit 6d5fea298a
36 changed files with 1643 additions and 1080 deletions

View File

@@ -92,15 +92,23 @@ struct expr_runtime {
struct maat_kv_store *district_map;
struct maat_kv_store *tmp_district_map;
long long *scan_cnt;
long long *scan_times;
long long *scan_cpu_time;
long long *hit_cnt;
long long update_err_cnt;
long long *scan_bytes;
long long *hit_times;
long long *hit_item_num;
long long *hit_pattern_num;
long long update_err_cnt;
};
static enum expr_type int_to_expr_type(int expr_type)
{
struct expr_runtime_stream {
struct expr_runtime *ref_expr_rt;
struct expr_matcher_stream *handle;
};
static enum expr_type int_to_expr_type(int expr_type) {
enum expr_type type = EXPR_TYPE_INVALID;
switch (expr_type) {
@@ -512,11 +520,14 @@ void *expr_runtime_new(void *expr_schema, size_t max_thread_num,
expr_rt->engine_type = schema->engine_type;
}
expr_rt->hit_cnt = alignment_int64_array_alloc(max_thread_num);
expr_rt->scan_cnt = alignment_int64_array_alloc(max_thread_num);
expr_rt->scan_times = alignment_int64_array_alloc(max_thread_num);
expr_rt->scan_bytes = alignment_int64_array_alloc(max_thread_num);
expr_rt->scan_cpu_time = alignment_int64_array_alloc(max_thread_num);
expr_rt->hit_times = alignment_int64_array_alloc(max_thread_num);
expr_rt->hit_item_num = alignment_int64_array_alloc(max_thread_num);
expr_rt->hit_pattern_num = alignment_int64_array_alloc(max_thread_num);
return expr_rt;
}
@@ -544,9 +555,9 @@ void expr_runtime_free(void *expr_runtime)
expr_rt->district_map = NULL;
}
if (expr_rt->scan_cnt != NULL) {
alignment_int64_array_free(expr_rt->scan_cnt);
expr_rt->scan_cnt = NULL;
if (expr_rt->scan_times != NULL) {
alignment_int64_array_free(expr_rt->scan_times);
expr_rt->scan_times = NULL;
}
if (expr_rt->scan_cpu_time != NULL) {
@@ -554,16 +565,26 @@ void expr_runtime_free(void *expr_runtime)
expr_rt->scan_cpu_time = NULL;
}
if (expr_rt->hit_cnt != NULL) {
alignment_int64_array_free(expr_rt->hit_cnt);
expr_rt->hit_cnt = NULL;
}
if (expr_rt->scan_bytes != NULL) {
alignment_int64_array_free(expr_rt->scan_bytes);
expr_rt->scan_bytes = NULL;
}
if (expr_rt->hit_times != NULL) {
alignment_int64_array_free(expr_rt->hit_times);
expr_rt->hit_times = NULL;
}
if (expr_rt->hit_item_num != NULL) {
alignment_int64_array_free(expr_rt->hit_item_num);
expr_rt->hit_item_num = NULL;
}
if (expr_rt->hit_pattern_num != NULL) {
alignment_int64_array_free(expr_rt->hit_pattern_num);
expr_rt->hit_pattern_num = NULL;
}
FREE(expr_rt);
}
@@ -1015,15 +1036,27 @@ int expr_runtime_scan(struct expr_runtime *expr_rt, int thread_id,
}
size_t n_hit_item = 0;
size_t n_hit_pattern = 0;
struct expr_scan_result hit_results[MAX_HIT_ITEM_NUM];
int ret = expr_matcher_match(expr_rt->matcher, thread_id, data, data_len,
hit_results, MAX_HIT_ITEM_NUM, &n_hit_item);
hit_results, MAX_HIT_ITEM_NUM, &n_hit_item,
&n_hit_pattern);
if (ret < 0) {
return -1;
}
if (n_hit_pattern > 0) {
alignment_int64_array_add(expr_rt->hit_pattern_num, state->thread_id,
n_hit_pattern);
}
if (n_hit_item > 0) {
alignment_int64_array_add(expr_rt->hit_item_num, state->thread_id,
n_hit_item);
}
struct maat_item hit_maat_items[n_hit_item];
size_t real_hit_item_cnt = 0;
size_t real_hit_item_num = 0;
if (0 == n_hit_item) {
goto next;
@@ -1041,10 +1074,10 @@ int expr_runtime_scan(struct expr_runtime *expr_rt, int thread_id,
continue;
}
hit_maat_items[real_hit_item_cnt].item_id = item_id;
hit_maat_items[real_hit_item_cnt].group_id = expr_item->group_id;
hit_maat_items[real_hit_item_num].item_id = item_id;
hit_maat_items[real_hit_item_num].group_id = expr_item->group_id;
real_hit_item_cnt++;
real_hit_item_num++;
}
}
@@ -1056,45 +1089,59 @@ next:
}
return compile_state_update(state->compile_state, state->maat_inst, vtable_id,
state->compile_table_id, state->scan_cnt,
hit_maat_items, real_hit_item_cnt);
state->compile_table_id, state->Nth_scan,
hit_maat_items, real_hit_item_num);
}
struct expr_matcher_stream *
struct expr_runtime_stream *
expr_runtime_stream_open(struct expr_runtime *expr_rt, int thread_id)
{
if (NULL == expr_rt || thread_id < 0) {
return NULL;
}
struct expr_matcher_stream *stream = expr_matcher_stream_open(expr_rt->matcher,
thread_id);
if (NULL == stream) {
struct expr_runtime_stream *expr_rt_stream = ALLOC(struct expr_runtime_stream, 1);
expr_rt_stream->ref_expr_rt = expr_rt;
expr_rt_stream->handle = expr_matcher_stream_open(expr_rt->matcher, thread_id);
if (NULL == expr_rt_stream->handle) {
FREE(expr_rt_stream);
return NULL;
}
return stream;
return expr_rt_stream;
}
int expr_runtime_stream_scan(struct expr_runtime *expr_rt,
struct expr_matcher_stream *s_handle,
int expr_runtime_stream_scan(struct expr_runtime_stream *expr_rt_stream,
const char *data, size_t data_len,
int vtable_id, struct maat_state *state)
{
struct expr_runtime *expr_rt = expr_rt_stream->ref_expr_rt;
if (0 == expr_rt->rule_num) {
//empty expr table
return 0;
}
size_t n_hit_item = 0;
size_t n_hit_pattern = 0;
struct expr_scan_result hit_results[MAX_HIT_ITEM_NUM];
int ret = expr_matcher_stream_match(s_handle, data, data_len, hit_results,
MAX_HIT_ITEM_NUM, &n_hit_item);
int ret = expr_matcher_stream_match(expr_rt_stream->handle, data, data_len, hit_results,
MAX_HIT_ITEM_NUM, &n_hit_item, &n_hit_pattern);
if (ret < 0) {
return -1;
}
if (n_hit_pattern > 0) {
alignment_int64_array_add(expr_rt->hit_pattern_num, state->thread_id,
n_hit_pattern);
}
if (n_hit_item > 0) {
alignment_int64_array_add(expr_rt->hit_item_num, state->thread_id,
n_hit_item);
}
struct maat_item hit_maat_items[n_hit_item];
struct expr_item *expr_item = NULL;
size_t real_hit_item_cnt = 0;
@@ -1126,27 +1173,22 @@ next:
}
return compile_state_update(state->compile_state, state->maat_inst, vtable_id,
state->compile_table_id, state->scan_cnt,
state->compile_table_id, state->Nth_scan,
hit_maat_items, real_hit_item_cnt);
}
void expr_runtime_stream_close(struct expr_runtime *expr_rt, int thread_id,
struct expr_matcher_stream *stream)
void expr_runtime_stream_close(struct expr_runtime_stream *expr_rt_stream)
{
if (NULL == expr_rt || thread_id < 0 || NULL == stream) {
if (NULL == expr_rt_stream) {
return;
}
expr_matcher_stream_close(stream);
}
void expr_runtime_hit_inc(struct expr_runtime *expr_rt, int thread_id)
{
if (NULL == expr_rt || thread_id < 0) {
return;
expr_rt_stream->ref_expr_rt = NULL;
if (expr_rt_stream->handle != NULL) {
expr_matcher_stream_close(expr_rt_stream->handle);
}
alignment_int64_array_add(expr_rt->hit_cnt, thread_id, 1);
FREE(expr_rt_stream);
}
void expr_runtime_perf_stat(struct expr_runtime *expr_rt, size_t scan_len,
@@ -1157,7 +1199,7 @@ void expr_runtime_perf_stat(struct expr_runtime *expr_rt, size_t scan_len,
return;
}
alignment_int64_array_add(expr_rt->scan_cnt, thread_id, 1);
alignment_int64_array_add(expr_rt->scan_times, thread_id, 1);
alignment_int64_array_add(expr_rt->scan_bytes, thread_id, scan_len);
if (start != NULL && end != NULL) {
@@ -1167,16 +1209,16 @@ void expr_runtime_perf_stat(struct expr_runtime *expr_rt, size_t scan_len,
}
}
long long expr_runtime_scan_count(void *expr_runtime)
long long expr_runtime_scan_times(void *expr_runtime)
{
if (NULL == expr_runtime) {
return 0;
}
struct expr_runtime *expr_rt = (struct expr_runtime *)expr_runtime;
long long sum = alignment_int64_array_sum(expr_rt->scan_cnt,
long long sum = alignment_int64_array_sum(expr_rt->scan_times,
expr_rt->n_worker_thread);
alignment_int64_array_reset(expr_rt->scan_cnt, expr_rt->n_worker_thread);
alignment_int64_array_reset(expr_rt->scan_times, expr_rt->n_worker_thread);
return sum;
}
@@ -1195,16 +1237,66 @@ long long expr_runtime_scan_cpu_time(void *expr_runtime)
return sum;
}
long long expr_runtime_hit_count(void *expr_runtime)
void expr_runtime_hit_times_inc(struct expr_runtime *expr_rt, int thread_id)
{
if (NULL == expr_rt || thread_id < 0) {
return;
}
alignment_int64_array_add(expr_rt->hit_times, thread_id, 1);
}
void expr_runtime_stream_hit_times_inc(struct expr_runtime_stream *expr_rt_stream,
int thread_id)
{
if (NULL == expr_rt_stream || thread_id < 0) {
return;
}
struct expr_runtime *expr_rt = expr_rt_stream->ref_expr_rt;
alignment_int64_array_add(expr_rt->hit_times, thread_id, 1);
}
long long expr_runtime_hit_times(void *expr_runtime)
{
if (NULL == expr_runtime) {
return 0;
}
struct expr_runtime *expr_rt = (struct expr_runtime *)expr_runtime;
long long sum = alignment_int64_array_sum(expr_rt->hit_cnt,
long long sum = alignment_int64_array_sum(expr_rt->hit_times,
expr_rt->n_worker_thread);
alignment_int64_array_reset(expr_rt->hit_cnt, expr_rt->n_worker_thread);
alignment_int64_array_reset(expr_rt->hit_times,
expr_rt->n_worker_thread);
return sum;
}
long long expr_runtime_hit_item_num(void *expr_runtime)
{
if (NULL == expr_runtime) {
return 0;
}
struct expr_runtime *expr_rt = (struct expr_runtime *)expr_runtime;
long long sum = alignment_int64_array_sum(expr_rt->hit_item_num,
expr_rt->n_worker_thread);
alignment_int64_array_reset(expr_rt->hit_item_num, expr_rt->n_worker_thread);
return sum;
}
long long expr_runtime_hit_pattern_num(void *expr_runtime)
{
if (NULL == expr_runtime) {
return 0;
}
struct expr_runtime *expr_rt = (struct expr_runtime *)expr_runtime;
long long sum = alignment_int64_array_sum(expr_rt->hit_pattern_num,
expr_rt->n_worker_thread);
alignment_int64_array_reset(expr_rt->hit_pattern_num,
expr_rt->n_worker_thread);
return sum;
}