support maat stat

This commit is contained in:
liuwentan
2023-04-20 15:34:56 +08:00
parent ff4666ca9d
commit af6df5951a
35 changed files with 1758 additions and 257 deletions

View File

@@ -72,8 +72,10 @@ struct expr_runtime {
struct rcu_hash_table *item_htable; // store this expr table's all maat_item which will be used in expr_runtime_scan
long long version; //expr_rt version
long long regex_rule_num;
long long rule_num;
int n_worker_thread;
long long update_err_cnt;
size_t n_worker_thread;
struct maat_garbage_bin *ref_garbage_bin;
struct log_handle *logger;
int district_num;
@@ -81,8 +83,10 @@ struct expr_runtime {
struct maat_kv_store *tmp_district_map;
long long *scan_cnt;
long long *scan_cpu_time;
long long *hit_cnt;
// long long *stream_num;
long long *stream_num;
long long *scan_bytes;
};
enum expr_type int_to_expr_type(int expr_type)
@@ -446,7 +450,7 @@ void expr_maat_item_free(void *user_ctx, void *data)
maat_item_free(item);
}
void *expr_runtime_new(void *expr_schema, int max_thread_num,
void *expr_runtime_new(void *expr_schema, size_t max_thread_num,
struct maat_garbage_bin *garbage_bin,
struct log_handle *logger)
{
@@ -465,6 +469,8 @@ void *expr_runtime_new(void *expr_schema, int max_thread_num,
expr_rt->hit_cnt = alignment_int64_array_alloc(max_thread_num);
expr_rt->scan_cnt = alignment_int64_array_alloc(max_thread_num);
expr_rt->scan_bytes = alignment_int64_array_alloc(max_thread_num);
expr_rt->stream_num = alignment_int64_array_alloc(max_thread_num);
return expr_rt;
}
@@ -498,14 +504,29 @@ void expr_runtime_free(void *expr_runtime)
expr_rt->district_map = NULL;
}
if (expr_rt->scan_cnt != NULL) {
alignment_int64_array_free(expr_rt->scan_cnt);
expr_rt->scan_cnt = NULL;
}
if (expr_rt->scan_cpu_time != NULL) {
alignment_int64_array_free(expr_rt->scan_cpu_time);
expr_rt->scan_cpu_time = NULL;
}
if (expr_rt->hit_cnt != NULL) {
alignment_int64_array_free(expr_rt->hit_cnt);
expr_rt->hit_cnt = NULL;
}
if (expr_rt->scan_cnt != NULL) {
alignment_int64_array_free(expr_rt->scan_cnt);
expr_rt->scan_cnt = NULL;
if (expr_rt->stream_num != NULL) {
alignment_int64_array_free(expr_rt->stream_num);
expr_rt->stream_num = NULL;
}
if (expr_rt->scan_bytes != NULL) {
alignment_int64_array_free(expr_rt->scan_bytes);
expr_rt->scan_bytes = NULL;
}
FREE(expr_rt);
@@ -746,12 +767,13 @@ int expr_runtime_update(void *expr_runtime, void *expr_schema,
long long item_id = get_column_value(line, schema->item_id_column);
if (item_id < 0) {
expr_rt->update_err_cnt++;
return -1;
}
int is_valid = get_column_value(line, valid_column);
//printf("<expr_runtime_update> item_id:%lld is_valid:%d\n", item_id, is_valid);
if (is_valid < 0) {
expr_rt->update_err_cnt++;
return -1;
} else if (0 == is_valid) {
//delete
@@ -760,6 +782,7 @@ int expr_runtime_update(void *expr_runtime, void *expr_schema,
//add
struct expr_item *expr_item = expr_item_new(line, schema, expr_rt);
if (NULL == expr_item) {
expr_rt->update_err_cnt++;
return -1;
}
@@ -772,6 +795,7 @@ int expr_runtime_update(void *expr_runtime, void *expr_schema,
__FUNCTION__, __LINE__, table_name, item_id);
expr_item_free(expr_item);
maat_item_free(item);
expr_rt->update_err_cnt++;
return -1;
}
@@ -781,6 +805,7 @@ int expr_runtime_update(void *expr_runtime, void *expr_schema,
log_error(expr_rt->logger, MODULE_EXPR,
"[%s:%d] [table:%s] transform expr_item(item_id:%lld) to expr_rule failed",
__FUNCTION__, __LINE__, table_name, item_id);
expr_rt->update_err_cnt++;
return -1;
}
}
@@ -792,6 +817,7 @@ int expr_runtime_update(void *expr_runtime, void *expr_schema,
if (expr_rule != NULL) {
expr_rule_free(expr_rule);
}
expr_rt->update_err_cnt++;
return -1;
}
@@ -887,6 +913,16 @@ long long expr_runtime_rule_count(void *expr_runtime)
return expr_rt->rule_num;
}
long long expr_runtime_regex_rule_count(void *expr_runtime)
{
if (NULL == expr_runtime) {
return 0;
}
struct expr_runtime *expr_rt = (struct expr_runtime *)expr_runtime;
return expr_rt->regex_rule_num;
}
long long expr_runtime_get_version(void *expr_runtime)
{
if (NULL == expr_runtime) {
@@ -955,6 +991,8 @@ struct adapter_hs_stream *expr_runtime_stream_open(struct expr_runtime *expr_rt,
return NULL;
}
alignment_int64_array_add(expr_rt->stream_num, thread_id, 1);
return adapter_hs_stream_open(expr_rt->hs, thread_id);
}
@@ -1001,17 +1039,116 @@ int expr_runtime_stream_scan(struct expr_runtime *expr_rt, struct adapter_hs_str
return group_hit_cnt;
}
void expr_runtime_stream_close(struct adapter_hs_stream *s_handle)
void expr_runtime_stream_close(struct expr_runtime *expr_rt, int thread_id,
struct adapter_hs_stream *s_handle)
{
alignment_int64_array_add(expr_rt->stream_num, thread_id, -1);
adapter_hs_stream_close(s_handle);
}
void expr_runtime_scan_hit_inc(struct expr_runtime *expr_rt, int thread_id)
void expr_runtime_hit_inc(struct expr_runtime *expr_rt, int thread_id)
{
if (NULL == expr_rt || thread_id < 0) {
return;
}
alignment_int64_array_add(expr_rt->hit_cnt, thread_id, 1);
}
long long expr_runtime_scan_hit_sum(struct expr_runtime *expr_rt, int n_thread)
void expr_runtime_perf_stat(struct expr_runtime *expr_rt, size_t scan_len,
struct timespec *start, struct timespec *end,
int thread_id)
{
return alignment_int64_array_sum(expr_rt->hit_cnt, n_thread);
if (NULL == expr_rt || thread_id < 0) {
return;
}
alignment_int64_array_add(expr_rt->scan_cnt, thread_id, 1);
alignment_int64_array_add(expr_rt->scan_bytes, thread_id, scan_len);
if (start != NULL && end != NULL) {
if (NULL == expr_rt->scan_cpu_time) {
expr_rt->scan_cpu_time = alignment_int64_array_alloc(expr_rt->n_worker_thread);
}
long long consume_time = (end->tv_sec - start->tv_sec) * 1000000000 + end->tv_nsec - start->tv_nsec;
alignment_int64_array_add(expr_rt->scan_cpu_time, thread_id, consume_time);
}
}
long long expr_runtime_scan_count(void *expr_runtime)
{
if (NULL == expr_runtime) {
return 0;
}
struct expr_runtime *expr_rt = (struct expr_runtime *)expr_runtime;
long long sum = alignment_int64_array_sum(expr_rt->scan_cnt,
expr_rt->n_worker_thread);
alignment_int64_array_reset(expr_rt->scan_cnt, expr_rt->n_worker_thread);
return sum;
}
long long expr_runtime_scan_cpu_time(void *expr_runtime)
{
if (NULL == expr_runtime) {
return 0;
}
struct expr_runtime *expr_rt = (struct expr_runtime *)expr_runtime;
long long sum = alignment_int64_array_sum(expr_rt->scan_cpu_time,
expr_rt->n_worker_thread);
alignment_int64_array_reset(expr_rt->scan_cpu_time, expr_rt->n_worker_thread);
return sum;
}
long long expr_runtime_hit_count(void *expr_runtime)
{
if (NULL == expr_runtime) {
return 0;
}
struct expr_runtime *expr_rt = (struct expr_runtime *)expr_runtime;
long long sum = alignment_int64_array_sum(expr_rt->hit_cnt,
expr_rt->n_worker_thread);
alignment_int64_array_reset(expr_rt->hit_cnt, expr_rt->n_worker_thread);
return sum;
}
long long expr_runtime_update_err_count(void *expr_runtime)
{
if (NULL == expr_runtime) {
return 0;
}
struct expr_runtime *expr_rt = (struct expr_runtime *)expr_runtime;
return expr_rt->update_err_cnt;
}
long long expr_runtime_scan_bytes(struct expr_runtime *expr_rt)
{
if (NULL == expr_rt) {
return 0;
}
long long sum = alignment_int64_array_sum(expr_rt->scan_bytes,
expr_rt->n_worker_thread);
alignment_int64_array_reset(expr_rt->scan_bytes, expr_rt->n_worker_thread);
return sum;
}
long long expr_runtime_stream_num(struct expr_runtime *expr_rt)
{
if (NULL == expr_rt) {
return 0;
}
long long sum = alignment_int64_array_sum(expr_rt->stream_num, expr_rt->n_worker_thread);
alignment_int64_array_reset(expr_rt->stream_num, expr_rt->n_worker_thread);
return sum;
}