diff --git a/common/include/tfe_proxy.h b/common/include/tfe_proxy.h index 533880e..a151247 100644 --- a/common/include/tfe_proxy.h +++ b/common/include/tfe_proxy.h @@ -1,9 +1,13 @@ #pragma once +#include +#include struct tfe_proxy; const char * tfe_proxy_default_conffile(); const char * tfe_proxy_default_logger(); -unsigned int tfe_proxy_get_thread_count(); -struct event_base * tfe_proxy_get_evbase(unsigned int thread_id); +unsigned int tfe_proxy_get_work_thread_count(); +struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id); +struct event_base * tfe_proxy_get_gc_evbase(void); +screen_stat_handle_t tfe_proxy_get_fs_handle(void); diff --git a/common/include/tfe_utils.h b/common/include/tfe_utils.h index 235161e..38a83cb 100644 --- a/common/include/tfe_utils.h +++ b/common/include/tfe_utils.h @@ -73,9 +73,10 @@ do { MESA_handle_runtime_log(handler, RLOG_LV_DEBUG, "tfe", fmt, ##__VA_ARGS__); }) #endif -#define ATOMIC_INC(x) __atomic_fetch_add(x,1,__ATOMIC_RELAXED) -#define ATOMIC_DEC(x) __atomic_fetch_sub(x,1,__ATOMIC_RELAXED) -#define ATOMIC_READ(x) __atomic_fetch_add(x,0,__ATOMIC_RELAXED) +#define ATOMIC_INC(x) __atomic_fetch_add(x,1,__ATOMIC_RELAXED) +#define ATOMIC_DEC(x) __atomic_fetch_sub(x,1,__ATOMIC_RELAXED) +#define ATOMIC_READ(x) __atomic_fetch_add(x,0,__ATOMIC_RELAXED) +#define ATOMIC_ADD(x, y) __atomic_fetch_add(x,y,__ATOMIC_RELAXED) #ifndef MAX diff --git a/platform/include/internal/ssl_stream.h b/platform/include/internal/ssl_stream.h index 78c2acb..52be205 100644 --- a/platform/include/internal/ssl_stream.h +++ b/platform/include/internal/ssl_stream.h @@ -8,13 +8,12 @@ #include #include -#include struct ssl_stream; struct ssl_mgr; struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section, struct event_base * ev_base_gc, - void * logger, screen_stat_handle_t fs); + void * logger); void ssl_manager_destroy(struct ssl_mgr * mgr); struct ssl_stream * ssl_upstream_create_result_release_stream(future_result_t * result); diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index c10c399..6df17b0 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -194,29 +194,35 @@ static void * __thread_ctx_entry(void * arg) return (void *)NULL; } -struct tfe_thread_ctx * __thread_ctx_create(struct tfe_proxy * proxy, unsigned int thread_id) + +void tfe_proxy_work_thread_create_ctx(struct tfe_proxy * proxy) { - struct tfe_thread_ctx * __thread_ctx = ALLOC(struct tfe_thread_ctx, 1); - assert(__thread_ctx != NULL); - - __thread_ctx->thread_id = thread_id; - __thread_ctx->evbase = event_base_new(); - - int ret = pthread_create(&__thread_ctx->thr, NULL, __thread_ctx_entry, (void *)__thread_ctx); - if (unlikely(ret < 0)) + unsigned int i=0; + for(i=0; inr_work_threads;i++) { - TFE_LOG_ERROR(proxy->logger, "Failed at pthread_create() for thread %d: %s",errno, strerror(errno)); - goto __errout; + proxy->work_threads[i]=ALLOC(struct tfe_thread_ctx, 1); + proxy->work_threads[i]->thread_id = i; + proxy->work_threads[i]->evbase = event_base_new(); } - - return __thread_ctx; - -__errout: - if (__thread_ctx != NULL && __thread_ctx->evbase != NULL) event_base_free(__thread_ctx->evbase); - if (__thread_ctx != NULL) free(__thread_ctx); - return NULL; + return; +} +int tfe_proxy_work_thread_run(struct tfe_proxy * proxy) +{ + struct tfe_thread_ctx * __thread_ctx=NULL; + unsigned int i=0; + int ret=0; + for(i=0; inr_work_threads;i++) + { + __thread_ctx=proxy->work_threads[i]; + ret = pthread_create(&__thread_ctx->thr, NULL, __thread_ctx_entry, (void *)__thread_ctx); + if (unlikely(ret < 0)) + { + TFE_LOG_ERROR(proxy->logger, "Failed at pthread_create() for thread %d, error %d: %s", i, errno, strerror(errno)); + return -1; + } + } + return 0; } - int tfe_proxy_config(struct tfe_proxy * proxy, const char * profile) { /* Worker threads */ @@ -304,7 +310,7 @@ int main(int argc, char *argv[]) /* SSL INIT */ g_default_proxy->ssl_mgr_handler = ssl_manager_init(main_profile, "ssl", - g_default_proxy->evbase, g_default_logger, g_default_proxy->fs_handle); + g_default_proxy->evbase, g_default_logger); CHECK_OR_EXIT(g_default_proxy->ssl_mgr_handler, "Failed at init SSL manager. Exit."); for (size_t i = 0; i < (sizeof(signals) / sizeof(int)); i++) @@ -317,15 +323,9 @@ int main(int argc, char *argv[]) struct timeval gc_delay = {2, 0}; evtimer_add(g_default_proxy->gcev , &gc_delay); - /* WORKER THREAD */ - //TODO: Split ctx_create functioin to create and Run. - for(unsigned tid = 0; tid < g_default_proxy->nr_work_threads; tid++) - { - g_default_proxy->work_threads[tid] = __thread_ctx_create(g_default_proxy, tid); - CHECK_OR_EXIT(g_default_proxy->work_threads[tid], "Failed at creating thread %u", tid); - } - - + /* WORKER THREAD CTX Create */ + tfe_proxy_work_thread_create_ctx(g_default_proxy); + /* ACCEPTOR INIT */ g_default_proxy->kni_acceptor_handler = kni_acceptor_init(g_default_proxy, main_profile, g_default_logger); CHECK_OR_EXIT(g_default_proxy->kni_acceptor_handler, "Failed at init KNI acceptor. Exit. "); @@ -341,21 +341,31 @@ int main(int argc, char *argv[]) TFE_LOG_INFO(g_default_logger, "Plugin %s initialized. ", plugin_iter->symbol); } - - + ret=tfe_proxy_work_thread_run(g_default_proxy); + CHECK_OR_EXIT(ret==0, "Failed at creating thread. Exit."); + TFE_LOG_ERROR(g_default_logger, "Tango Frontend Engine initialized. "); event_base_dispatch(g_default_proxy->evbase); return 0; } -unsigned int tfe_proxy_get_thread_count(void) +unsigned int tfe_proxy_get_work_thread_count(void) { return g_default_proxy->nr_work_threads; } -struct event_base * tfe_proxy_get_evbase(unsigned int thread_id) +struct event_base * tfe_proxy_get_work_thread_evbase(unsigned int thread_id) { assert(thread_idnr_work_threads); return g_default_proxy->work_threads[thread_id]->evbase; } +struct event_base * tfe_proxy_get_gc_evbase(void) +{ + return g_default_proxy->evbase; +} + +screen_stat_handle_t tfe_proxy_get_fs_handle(void) +{ + return g_default_proxy->fs_handle; +} diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp index e717026..bbca5a8 100644 --- a/platform/src/ssl_stream.cpp +++ b/platform/src/ssl_stream.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -207,6 +208,22 @@ struct fs_spec enum ssl_stream_stat id; const char* name; }; +/* + * Garbage collection handler. + */ +static void +ssl_stream_gc_cb(evutil_socket_t fd, short what, void * arg) +{ + struct ssl_mgr *mgr=(struct ssl_mgr *)arg; + int i=0; + ssl_sess_cache_stat(mgr->up_sess_cache, &(mgr->stat_val[SSL_UP_CACHE_SZ]), &(mgr->stat_val[SSL_UP_CACHE_QUERY]), &(mgr->stat_val[SSL_UP_CACHE_HIT])); + ssl_sess_cache_stat(mgr->down_sess_cache, &(mgr->stat_val[SSL_DOWN_CACHE_SZ]), &(mgr->stat_val[SSL_DOWN_CACHE_QUERY]), &(mgr->stat_val[SSL_DOWN_CACHE_HIT])); + for(i=0;ifs_handle, mgr->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(mgr->stat_val[i]))); + } + return; +} void ssl_stat_init(struct ssl_mgr * mgr) { int i=0; @@ -288,6 +305,11 @@ void ssl_stat_init(struct ssl_mgr * mgr) FS_CALC_CURRENT, "dtkt_hit"); } + + struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here. + mgr->gcev = event_new(mgr->ev_base_gc, -1, EV_PERSIST, ssl_stream_gc_cb, mgr); + evtimer_add(mgr->gcev, &gc_delay); + return; } static SSL * downstream_ssl_create(struct ssl_mgr * mgr, struct keyring * crt); @@ -424,27 +446,11 @@ void ssl_manager_destroy(struct ssl_mgr * mgr) } free(mgr); } -/* - * Garbage collection handler. - */ -static void -ssl_stream_gc_cb(evutil_socket_t fd, short what, void * arg) -{ - struct ssl_mgr *mgr=(struct ssl_mgr *)arg; - int i=0; - ssl_sess_cache_stat(mgr->up_sess_cache, &(mgr->stat_val[SSL_UP_CACHE_SZ]), &(mgr->stat_val[SSL_UP_CACHE_QUERY]), &(mgr->stat_val[SSL_UP_CACHE_HIT])); - ssl_sess_cache_stat(mgr->down_sess_cache, &(mgr->stat_val[SSL_DOWN_CACHE_SZ]), &(mgr->stat_val[SSL_DOWN_CACHE_QUERY]), &(mgr->stat_val[SSL_DOWN_CACHE_HIT])); - for(i=0;ifs_handle, mgr->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(mgr->stat_val[i]))); - } - return; -} + struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section, - struct event_base * ev_base_gc, void * logger, screen_stat_handle_t fs) + struct event_base * ev_base_gc, void * logger) { - struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here. unsigned char key_name[]="!mesalab-tfe3a~&"; unsigned char aes_key_def[]={0xC5,0xAC,0xC1,0xA6,0xB2,0xBB,0xCA,0xC7,0xE3,0xBE,0xE3,0xB2,0xC6,0xA3,0xB1,0xB9 ,0xA3,0xAC,0xB6,0xF8,0xCA,0xC7,0xD1,0xDB,0xBE,0xA6,0xC0,0xEF,0xD3,0xD0,0xB9,0x84}; @@ -456,7 +462,6 @@ struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section char version_str[TFE_SYMBOL_MAX]; mgr->logger = logger; mgr->ev_base_gc=ev_base_gc; - mgr->fs_handle=fs; MESA_load_profile_string_def(ini_profile, section, "ssl_min_version", version_str, sizeof(version_str), "ssl3"); mgr->ssl_min_version = sslver_str2num(version_str); MESA_load_profile_string_def(ini_profile, section, "ssl_max_version", version_str, sizeof(version_str), "tls12"); @@ -538,15 +543,10 @@ struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section } memcpy(mgr->ssl_session_context, "mesa-tfe", sizeof(mgr->ssl_session_context)); - mgr->fs_handle=fs; + mgr->fs_handle=tfe_proxy_get_fs_handle(); ssl_stat_init(mgr); - mgr->gcev = event_new(mgr->ev_base_gc, -1, EV_PERSIST, ssl_stream_gc_cb, mgr); - if (!mgr->gcev) - { - goto error_out; - } - evtimer_add(mgr->gcev, &gc_delay); + return mgr; @@ -1490,6 +1490,7 @@ retry: if (ctx->retries++ >= MAX_NET_RETRIES) { + /* struct tfe_stream_addr* addr=tfe_stream_addr_create_by_fd(fd, ctx->s_stream->dir); char* addr_string=tfe_stream_addr_to_str(addr); TFE_LOG_ERROR(logger, "Failed to shutdown %s SSL connection cleanly: %s " @@ -1498,6 +1499,7 @@ retry: addr_string, fd); tfe_stream_addr_free(addr); free(addr_string); + */ if(ctx->s_stream->dir==CONN_DIR_DOWNSTREAM) { ATOMIC_INC(&(mgr->stat_val[SSL_DOWN_DIRTY_CLOSED])); diff --git a/plugin/business/pangu-http/pangu_http.cpp b/plugin/business/pangu-http/pangu_http.cpp index 6610b7c..784ced7 100644 --- a/plugin/business/pangu-http/pangu_http.cpp +++ b/plugin/business/pangu-http/pangu_http.cpp @@ -53,6 +53,17 @@ enum scan_table __SCAN_TABLE_MAX }; +enum pangu_http_stat +{ + STAT_SESSION, + STAT_LOG_NUM, + STAT_ACTION_MONIT, + STAT_ACTION_REJECT, + STAT_ACTION_REDIRECT, + STAT_ACTION_REPLACE, + STAT_ACTION_WHITELSIT, + __PG_STAT_MAX +}; struct pangu_rt { Maat_feather_t maat; @@ -64,7 +75,15 @@ struct pangu_rt ctemplate::Template * tpl_403, * tpl_404, * tpl_451; char * reject_page; int page_size; + + int cache_enabled; struct cache_handle* cache; + + screen_stat_handle_t fs_handle; + long long stat_val[__PG_STAT_MAX]; + int fs_id[__PG_STAT_MAX]; + struct event_base* gc_evbase; + struct event* gcev; }; struct pangu_rt * g_pangu_rt; @@ -148,13 +167,51 @@ error_out: Maat_burn_feather(target); return NULL; } +static void pangu_http_gc_cb(evutil_socket_t fd, short what, void * arg) +{ + int i=0; + + for(i=0;i<__PG_STAT_MAX;i++) + { + FS_operate(g_pangu_rt->fs_handle, g_pangu_rt->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(g_pangu_rt->stat_val[i]))); + } + return; +} + +static void pangu_http_stat_init(struct pangu_rt * pangu_runtime) +{ + int i=0; + struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here. + const char* spec[__PG_STAT_MAX]={0}; + spec[STAT_SESSION]="http_sess"; + spec[STAT_LOG_NUM]="log_num"; + spec[STAT_ACTION_MONIT]="monit"; + spec[STAT_ACTION_REJECT]="reject"; + spec[STAT_ACTION_REDIRECT]="redirect"; + spec[STAT_ACTION_REPLACE]="replace"; + spec[STAT_ACTION_WHITELSIT]="whitelist"; + + for(i=0;i<__PG_STAT_MAX;i++) + { + if(spec[i]!=NULL) + { + pangu_runtime->fs_id[i]=FS_register(pangu_runtime->fs_handle, FS_STYLE_STATUS, FS_CALC_CURRENT, spec[i]); + } + } + g_pangu_rt->gcev = event_new(pangu_runtime->gc_evbase, -1, EV_PERSIST, pangu_http_gc_cb, NULL); + evtimer_add(g_pangu_rt->gcev, &gc_delay); + + return; +} + int pangu_http_init(struct tfe_proxy * proxy) { const char * profile = "./pangu_conf/pangu_pxy.conf"; const char * logfile = "./log/pangu_pxy.log"; g_pangu_rt = ALLOC(struct pangu_rt, 1); - g_pangu_rt->thread_num = tfe_proxy_get_thread_count(); + g_pangu_rt->thread_num = tfe_proxy_get_work_thread_count(); + g_pangu_rt->gc_evbase=tfe_proxy_get_gc_evbase(); MESA_load_profile_int_def(profile, "DEBUG", "LOG_LEVEL", &(g_pangu_rt->log_level), 0); g_pangu_rt->local_logger = MESA_create_runtime_log_handle(logfile, g_pangu_rt->log_level); g_pangu_rt->send_logger = pangu_log_handle_create(profile, "LOG", g_pangu_rt->local_logger); @@ -162,6 +219,11 @@ int pangu_http_init(struct tfe_proxy * proxy) { goto error_out; } + g_pangu_rt->fs_handle = tfe_proxy_get_fs_handle(); + pangu_http_stat_init(g_pangu_rt); + + + g_pangu_rt->maat = create_maat_feather(profile, "MAAT", g_pangu_rt->thread_num, g_pangu_rt->local_logger); if (!g_pangu_rt->maat) { @@ -199,7 +261,17 @@ int pangu_http_init(struct tfe_proxy * proxy) "./pangu_conf/template/HTTP451.html"); g_pangu_rt->tpl_451 = ctemplate::Template::GetTemplate(page_path, ctemplate::DO_NOT_STRIP); - g_pangu_rt->cache = create_web_cache_handle(profile, "TANGO_CACHE", g_pangu_rt->local_logger); + MESA_load_profile_int_def(profile, "TANGO_CACHE", "enable_cache", &(g_pangu_rt->cache_enabled), 1); + if(g_pangu_rt->cache_enabled) + { + g_pangu_rt->cache = create_web_cache_handle(profile, "TANGO_CACHE", g_pangu_rt->gc_evbase, g_pangu_rt->local_logger); + if(!g_pangu_rt->cache) + { + TFE_LOG_INFO(NULL, "Tango Cache init failed."); + goto error_out; + } + TFE_LOG_INFO(NULL, "Tango Cache Enabled."); + } TFE_LOG_INFO(NULL, "Pangu HTTP init success."); return 0; @@ -207,25 +279,6 @@ error_out: TFE_LOG_ERROR(NULL, "Pangu HTTP init failed."); return -1; } -static void _wrap_std_field_write(struct tfe_http_half * half, enum tfe_http_std_field field_id, const char * value) -{ - struct http_field_name tmp_name; - tmp_name.field_id = field_id; - tmp_name.field_name = NULL; - tfe_http_field_write(half, &tmp_name, value); - return; -} -#if 0 -static void _wrap_non_std_field_write(struct tfe_http_half * half, const char* field_name, const char * value) -{ - struct http_field_name tmp_name; - tmp_name.field_id=TFE_HTTP_UNKNOWN_FIELD; - //todo remove force convert after tfe_http.h improved. - tmp_name.field_name=(char*)field_name; - tfe_http_field_write(half, &tmp_name, value); - return; -} -#endif struct replace_ctx { @@ -450,7 +503,7 @@ static void cache_query_on_succ(future_result_t * result, void * user) tfe_http_session_resume(ctx->ref_session); ctx->cached_response=tfe_http_session_response_create(ctx->ref_session, 200); - tfe_http_nonstd_field_write(ctx->cached_response, "X-Cache-Lookup", "Hit From MESA-TFE"); + tfe_http_nonstd_field_write(ctx->cached_response, "X-Cache-Lookup", "Hit From TFE"); tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_TYPE, meta->content_type); snprintf(temp, sizeof(temp), "%lu", meta->content_length); tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_LENGTH, temp); @@ -469,8 +522,8 @@ static void cache_query_on_succ(future_result_t * result, void * user) case CACHE_QUERY_RESULT_END: assert(ctx->cached_response!=NULL); ctx->cache_query_status=WEB_CACHE_HIT; - tfe_http_half_write_body_end(ctx->cached_response); printf("cache query hit: %s\n", ctx->ref_session->req->req_spec.url); + tfe_http_half_write_body_end(ctx->cached_response); //ownership has been transferred to http session, set to NULL. ctx->cached_response=NULL; assert(ctx->cache_result_actual_sz==ctx->cache_result_declared_sz); @@ -846,16 +899,26 @@ void enforce_control_policy(const struct tfe_stream * stream, const struct tfe_h switch (ctx->action) { case PG_ACTION_NONE: + break; case PG_ACTION_MONIT: + ATOMIC_INC(&(g_pangu_rt->stat_val[STAT_ACTION_MONIT])); //send log on close. break; - case PG_ACTION_REJECT: http_reject(session, events, ctx); + case PG_ACTION_REJECT: + http_reject(session, events, ctx); + ATOMIC_INC(&(g_pangu_rt->stat_val[STAT_ACTION_REJECT])); break; - case PG_ACTION_REDIRECT: http_redirect(session, events, ctx); + case PG_ACTION_REDIRECT: + http_redirect(session, events, ctx); + ATOMIC_INC(&(g_pangu_rt->stat_val[STAT_ACTION_REDIRECT])); break; - case PG_ACTION_REPLACE: http_replace(stream, session, events, body_frag, frag_size, ctx); + case PG_ACTION_REPLACE: + http_replace(stream, session, events, body_frag, frag_size, ctx); + ATOMIC_INC(&(g_pangu_rt->stat_val[STAT_ACTION_REPLACE])); break; - case PG_ACTION_WHITELIST: tfe_http_session_detach(session); + case PG_ACTION_WHITELIST: + tfe_http_session_detach(session); + ATOMIC_INC(&(g_pangu_rt->stat_val[STAT_ACTION_WHITELSIT])); break; default: assert(0); break; @@ -911,6 +974,7 @@ void pangu_on_http_begin(const struct tfe_stream * stream, struct ipaddr sapp_addr; int hit_cnt = 0; assert(ctx == NULL); + ATOMIC_INC(&(g_pangu_rt->stat_val[STAT_SESSION])); ctx = pangu_http_ctx_new(thread_id); addr_tfe2sapp(stream->addr, &sapp_addr); hit_cnt = Maat_scan_proto_addr(g_pangu_rt->maat, g_pangu_rt->scan_table_id[PXY_CTRL_IP], &sapp_addr, 0, @@ -933,7 +997,7 @@ void pangu_on_http_end(const struct tfe_stream * stream, const struct tfe_http_session * session, unsigned int thread_id, void ** pme) { struct pangu_http_ctx * ctx = *(struct pangu_http_ctx **) pme; - int i=0, j=0; + int i=0, j=0,ret=0; if(ctx->action == PG_ACTION_REPLACE && ctx->rep_ctx->actually_replaced==0) { for(i=0; i< ctx->n_enforce; i++) @@ -957,7 +1021,8 @@ void pangu_on_http_end(const struct tfe_stream * stream, struct pangu_log log_msg = {.stream=stream, .http=session, .result=ctx->enforce_rules, .result_num=ctx->n_enforce}; if (ctx->action != PG_ACTION_NONE&& !(ctx->action == PG_ACTION_REPLACE && ctx->n_enforce==1 && ctx->rep_ctx->actually_replaced==0)) { - pangu_send_log(g_pangu_rt->send_logger, &log_msg); + ret=pangu_send_log(g_pangu_rt->send_logger, &log_msg); + ATOMIC_ADD(&(g_pangu_rt->stat_val[STAT_LOG_NUM]), ret); } pangu_http_ctx_free(ctx); *pme = NULL; @@ -984,16 +1049,17 @@ void pangu_on_http_data(const struct tfe_stream * stream, const struct tfe_http_ { return; } - - if(events & EV_HTTP_REQ_HDR && !ctx->resume_from_cache_query) + if(g_pangu_rt->cache_enabled) { - cache_query(session, thread_id, ctx); - } - if(!tfe_http_in_request(events)) - { - cache_update(session, events, body_frag, frag_size, thread_id, ctx); - } - + if(events & EV_HTTP_REQ_HDR && !ctx->resume_from_cache_query) + { + cache_query(session, thread_id, ctx); + } + if(!tfe_http_in_request(events)) + { + cache_update(session, events, body_frag, frag_size, thread_id, ctx); + } + } return; } diff --git a/plugin/business/pangu-http/pangu_logger.cpp b/plugin/business/pangu-http/pangu_logger.cpp index 5f34062..9053a59 100644 --- a/plugin/business/pangu-http/pangu_logger.cpp +++ b/plugin/business/pangu-http/pangu_logger.cpp @@ -21,7 +21,6 @@ struct json_spec const char *log_filed_name; enum tfe_http_std_field field_id; }; - struct pangu_logger { char local_ip_str[TFE_SYMBOL_MAX]; @@ -149,6 +148,7 @@ int pangu_send_log(struct pangu_logger* handle, const struct pangu_log* log_msg) cJSON *common_obj=NULL, *per_hit_obj=NULL; char* log_payload=NULL; int kafka_status=0; + int send_cnt=0; time_t cur_time; char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; char dst_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; @@ -235,8 +235,9 @@ int pangu_send_log(struct pangu_logger* handle, const struct pangu_log* log_msg) { TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); } + send_cnt++; } cJSON_free(common_obj); - return 0; + return send_cnt; } diff --git a/plugin/business/pangu-http/pangu_web_cache.cpp b/plugin/business/pangu-http/pangu_web_cache.cpp index 7f74138..1d43815 100644 --- a/plugin/business/pangu-http/pangu_web_cache.cpp +++ b/plugin/business/pangu-http/pangu_web_cache.cpp @@ -7,32 +7,222 @@ #include #include +#include +#include #include #include +enum cache_stat_field +{ + STAT_CACHE_QUERY, + STAT_CACHE_QUERY_NOT_APPLICABLE, + STAT_CACHE_QUERY_HIT, + STAT_CACHE_QUERY_BYTES, + STAT_CACHE_OVERRIDE_QUERY, + STAT_CACHE_OVERRIDE_HIT, + STAT_CACHE_OVERRIDE_BYTES, + STAT_CACHE_QUERY_ERR, + STAT_CACHE_UPLOAD_CNT, + STAT_CACHE_UPLOAD_OVERRIDE, + STAT_CACHE_UPLOAD_FORBIDEN, + STAT_CACHE_UPLOAD_ABANDON, + STAT_CACHE_UPLOAD_ERR, + STAT_CACHE_UPLOAD_BYTES, + STAT_CACHE_MEMORY, + STAT_CACHE_ACTIVE_SESSION, + STAT_CACHE_QUERY_HIT_OJB_SIZE, + STAT_CACHE_UPLOAD_OBJ_SIZE, + STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE, + STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE, + __CACHE_STAT_MAX +}; struct cache_handle { unsigned int thread_count; + int cache_undefined_obj_enabled; + size_t cache_undefined_obj_min_size; + int minimum_cache_seconds; struct tango_cache_instance **clients; + screen_stat_handle_t fs_handle; + long long stat_val[__CACHE_STAT_MAX]; + int fs_id[__CACHE_STAT_MAX]; + struct event_base* gc_evbase; + struct event* gcev; }; struct cache_update_context { + struct cache_handle* ref_cache_handle; struct tango_cache_ctx * write_ctx; }; -struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, void *logger) +static void cache_gc_cb(evutil_socket_t fd, short what, void * arg) { - struct cache_handle* handle=ALLOC(struct cache_handle, 1); - handle->thread_count=tfe_proxy_get_thread_count(); - handle->clients=ALLOC(struct tango_cache_instance *, handle->thread_count); - int i=0; - for(i=0; ithread_count; i++) + struct cache_handle* cache=(struct cache_handle *)arg; + struct cache_statistics client_stat_sum, client_stat; + memset(&client_stat_sum, 0, sizeof(client_stat_sum)); + long long *val_sum = (long long *)&client_stat_sum; + long long *val = NULL; + int i=0, j=0; + for(i=0; ithread_count;i++) { - handle->clients[i]=tango_cache_instance_new(tfe_proxy_get_evbase(i), profile_path, section, logger); + tango_cache_get_statistics(cache->clients[i], &client_stat); + val=(long long*)&client_stat; + for(j=0; jstat_val[i]))!=0) + { + switch(i) + { + case STAT_CACHE_UPLOAD_BYTES: + case STAT_CACHE_QUERY_BYTES: + case STAT_CACHE_OVERRIDE_BYTES: + //translate bytes to mega bytes. + FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))/(1024*1024)); + break; + default: + FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))); + break; + } + } + } + + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY], 0, FS_OP_SET, client_stat_sum.get_recv_num); + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_HIT], 0, FS_OP_SET, client_stat_sum.get_succ_num); + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_QUERY_ERR], 0, FS_OP_SET, client_stat_sum.get_error_num); + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_CNT], 0, FS_OP_SET, client_stat_sum.put_recv_num); + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_ERR], 0, FS_OP_SET, client_stat_sum.put_error_num); + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_MEMORY], 0, FS_OP_SET, client_stat_sum.memory_used/(1024*1024)); + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_ACTIVE_SESSION], 0, FS_OP_SET, client_stat_sum.session_num); + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_UPLOAD_ABANDON], 0, FS_OP_SET, client_stat_sum.totaldrop_num); + FS_passive_output(cache->fs_handle); + return; +} +struct cache_stat_sepc +{ + const char* name; + enum field_dsp_style_t style; + enum field_calc_algo calc_type; +}; + +static void set_stat_spec(struct cache_stat_sepc* spec, const char* name, enum field_dsp_style_t style, enum field_calc_algo calc_type) +{ + spec->name=name; + spec->style=style; + spec->calc_type=calc_type; + return; +} +void cache_stat_init(struct cache_handle* cache) +{ + const char* fieldstat_output="./cache.fieldstat"; + const char* app_name="tango_cache"; + const char* obj_size_bins_KB="10,100,1000,10000"; + + int value=0, i=0; + screen_stat_handle_t fs_handle=NULL; + fs_handle=FS_create_handle(); + FS_set_para(fs_handle, OUTPUT_DEVICE, fieldstat_output, strlen(fieldstat_output)+1); + value=1; + FS_set_para(fs_handle, PRINT_MODE, &value, sizeof(value)); + value=0; + FS_set_para(fs_handle, CREATE_THREAD, &value, sizeof(value)); + FS_set_para(fs_handle, APP_NAME, app_name, strlen(app_name)+1); + FS_set_para(fs_handle, HISTOGRAM_GLOBAL_BINS, obj_size_bins_KB, strlen(obj_size_bins_KB)+1); + + cache->fs_handle=fs_handle; + + struct cache_stat_sepc spec[__CACHE_STAT_MAX]; + + set_stat_spec(&spec[STAT_CACHE_QUERY], "cache_query",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_QUERY_NOT_APPLICABLE], "qry_not_allow",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_QUERY_HIT], "hit_num",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_QUERY_BYTES], "downloaded(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_OVERRIDE_QUERY], "or_qry",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT], "or_hit",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_OVERRIDE_BYTES], "or_download(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_QUERY_ERR], "query_err",FS_STYLE_STATUS, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_UPLOAD_CNT], "cache_upload",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_UPLOAD_OVERRIDE], "or_upload",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_UPLOAD_FORBIDEN], "upload_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_UPLOAD_ABANDON], "upload_abandon",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_UPLOAD_ERR], "upload_err",FS_STYLE_STATUS, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_UPLOAD_BYTES], "uploaded(MB)",FS_STYLE_FIELD, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_MEMORY], "used_mem(MB)",FS_STYLE_STATUS, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_ACTIVE_SESSION], "active_sess",FS_STYLE_STATUS, FS_CALC_CURRENT); + + set_stat_spec(&spec[STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE], "or_hit_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_QUERY_HIT_OJB_SIZE], "hitted_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_UPLOAD_OBJ_SIZE], "cached_obj(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); + set_stat_spec(&spec[STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE], "or_cached(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); + + + for(i=0;i<__CACHE_STAT_MAX;i++) + { + cache->fs_id[i]=FS_register(cache->fs_handle, spec[i].style, spec[i].calc_type, spec[i].name); + } +// value=cache->fs_id[STAT_CACHE_QUERY_HIT]; +// FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value)); + + FS_register_ratio(cache->fs_handle, + cache->fs_id[STAT_CACHE_QUERY_HIT], + cache->fs_id[STAT_CACHE_QUERY], + 1, + FS_STYLE_STATUS, + FS_CALC_CURRENT, + "cache_hit"); + + value=cache->fs_id[STAT_CACHE_OVERRIDE_HIT]; + FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value)); + + FS_register_ratio(cache->fs_handle, + cache->fs_id[STAT_CACHE_OVERRIDE_HIT], + cache->fs_id[STAT_CACHE_OVERRIDE_QUERY], + 1, + FS_STYLE_STATUS, + FS_CALC_CURRENT, + "override_hit"); + + + FS_start(cache->fs_handle); + + struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here. + cache->gcev = event_new(cache->gc_evbase, -1, EV_PERSIST, cache_gc_cb, cache); + evtimer_add(cache->gcev, &gc_delay); + return; +} +struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, struct event_base* gc_evbase, void *logger) +{ + struct cache_handle* cache=ALLOC(struct cache_handle, 1); + int temp=0; + cache->thread_count=tfe_proxy_get_work_thread_count(); + cache->clients=ALLOC(struct tango_cache_instance *, cache->thread_count); + int i=0; + for(i=0; ithread_count; i++) + { + cache->clients[i]=tango_cache_instance_new(tfe_proxy_get_work_thread_evbase(i), profile_path, section, logger); + if(cache->clients[i]==NULL) + { + goto error_out; + } + } + + MESA_load_profile_int_def(profile_path, section, "cache_undefined_obj", &(cache->cache_undefined_obj_enabled), 1); + MESA_load_profile_int_def(profile_path, section, "cached_undefined_obj_minimum_size", &(temp), 100*1024); + cache->cache_undefined_obj_min_size=temp; + MESA_load_profile_int_def(profile_path, section, "cache_minimum_time_override", &(cache->minimum_cache_seconds), 60*5); + + cache->gc_evbase=gc_evbase; + cache_stat_init(cache); + return cache; +error_out: + free(cache); + return NULL; } static char* read_http1_hdr(const char* hdr, const char* field_name) @@ -99,18 +289,85 @@ size_t cache_query_result_get_data(future_result_t * result, const unsigned char *pp_data=(const unsigned char*)cache_result->data_frag; return cache_result->size; } +struct cache_query_context +{ + struct cache_handle* ref_handle; + int is_undefined_obj; + struct future* f_tango_cache_fetch; +}; +void cache_query_ctx_free_cb(void* p) +{ + struct cache_query_context* ctx=(struct cache_query_context*)p; + future_destroy(ctx->f_tango_cache_fetch); + ctx->f_tango_cache_fetch=NULL; + free(ctx); +} +static void wrap_cache_query_on_succ(future_result_t * result, void * user) +{ + struct promise * p = (struct promise *) user; + struct cache_query_context* ctx=(struct cache_query_context*)promise_get_ctx(p); + struct tango_cache_result* _result=tango_cache_read_result(result); + enum cache_query_result_type type=cache_query_result_get_type(result); + + switch(_result->type) + { + case RESULT_TYPE_HEADER: + if(ctx->is_undefined_obj) + { + ATOMIC_INC(&(ctx->ref_handle->stat_val[STAT_CACHE_OVERRIDE_HIT])); + FS_operate(ctx->ref_handle->fs_handle, ctx->ref_handle->fs_id[STAT_CACHE_OVERRIDE_HIT_OBJ_SIZE], 0, FS_OP_SET, _result->tlength/1024); + } + ATOMIC_INC(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_HIT])); + FS_operate(ctx->ref_handle->fs_handle, ctx->ref_handle->fs_id[STAT_CACHE_QUERY_HIT_OJB_SIZE], 0, FS_OP_SET, _result->tlength/1024); + break; + case RESULT_TYPE_END: + case RESULT_TYPE_MISS: + //last call. + promise_dettach_ctx(p); + cache_query_ctx_free_cb(ctx); + break; + case RESULT_TYPE_BODY: + ATOMIC_ADD(&(ctx->ref_handle->stat_val[STAT_CACHE_QUERY_BYTES]), _result->size); + break; + default: + break; + } + promise_success(p, result); + + return; +} +static void wrap_cache_query_on_fail(enum e_future_error err, const char * what, void * user) +{ + struct promise * p = (struct promise *) user; + struct cache_query_context* ctx=(struct cache_query_context*)promise_dettach_ctx(p); + promise_failed(p, err, what); + cache_query_ctx_free_cb(ctx); + return; +} enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_half * request, struct future* f) { struct request_freshness req_fresshness; - enum cache_pending_action get_action; - int ret=0; + enum cache_pending_action get_action; + struct cache_query_context* query_ctx=NULL; + struct promise* p=NULL; + struct future* _f=NULL; + int ret=0, is_undefined_obj=0; get_action=tfe_cache_get_pending(request, &req_fresshness); switch(get_action) { case UNDEFINED: - case FORBIDDEN: + if(!handle->cache_undefined_obj_enabled) + { + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_NOT_APPLICABLE])); + return WEB_CACHE_NOT_APPLICABLE; + } + is_undefined_obj=1; + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_OVERRIDE_QUERY])); + break; + case FORBIDDEN: + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_QUERY_NOT_APPLICABLE])); return WEB_CACHE_NOT_APPLICABLE; case VERIFY: case ALLOWED: @@ -124,7 +381,13 @@ enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsig memset(&meta, 0, sizeof(meta)); meta.url=request->req_spec.url; memcpy(&(meta.get), &req_fresshness, sizeof(meta.get)); - ret=tango_cache_fetch_object(handle->clients[thread_id], f, &meta); + query_ctx=ALLOC(struct cache_query_context, 1); + query_ctx->ref_handle=handle; + query_ctx->is_undefined_obj=is_undefined_obj; + p=future_to_promise(f); + promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb); + query_ctx->f_tango_cache_fetch=future_create("wrap_cache_qry", wrap_cache_query_on_succ, wrap_cache_query_on_fail, p); + ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta); assert(ret==0); return WEB_CACHE_QUERING; } @@ -138,24 +401,28 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle, struct tango_cache_ctx *write_ctx=NULL; char buffer[TFE_STRING_MAX]; const char* value=NULL; - int i=0; + int i=0, is_undefined_obj=0; size_t content_len=0; - if(session->resp->resp_spec.content_length!=NULL) + if(!session->resp->resp_spec.content_length) { - sscanf(session->resp->resp_spec.content_length, "%lu", &content_len); + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN])); + return NULL; } + sscanf(session->resp->resp_spec.content_length, "%lu", &content_len); put_action=tfe_cache_put_pending(session->resp, &resp_freshness); switch(put_action){ case FORBIDDEN: case VERIFY: + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_FORBIDEN])); return NULL; case ALLOWED: break; case UNDEFINED: - if(content_len<100*1024) + if(handle->cache_undefined_obj_enabled && content_lencache_undefined_obj_min_size) { return NULL; } + is_undefined_obj=1; break; default: assert(0); @@ -170,19 +437,26 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle, i++; memcpy(&meta.put, &resp_freshness, sizeof(resp_freshness)); write_ctx=tango_cache_update_start(handle->clients[thread_id], NULL, &meta); - if(write_ctx==NULL) + if(write_ctx==NULL)//exceed maximum cache memory size. { return NULL; } - + if(is_undefined_obj) + { + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_UPLOAD_OVERRIDE])); + FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_OVERRIDE_UPLOAD_OBJ_SIZE], 0, FS_OP_SET, content_len/1024); + } + FS_operate(handle->fs_handle,handle->fs_id[STAT_CACHE_UPLOAD_OBJ_SIZE], 0, FS_OP_SET, content_len/1024); update_ctx=ALLOC(struct cache_update_context, 1); update_ctx->write_ctx=write_ctx; + update_ctx->ref_cache_handle=handle; return update_ctx; } void web_cache_update(struct cache_update_context* ctx, const unsigned char * body_frag, size_t frag_size) { tango_cache_update_frag_data(ctx->write_ctx, (const char*)body_frag, frag_size); + ATOMIC_ADD(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_UPLOAD_BYTES]), frag_size); return; } void web_cache_update_end(struct cache_update_context* ctx) diff --git a/plugin/business/pangu-http/pangu_web_cache.h b/plugin/business/pangu-http/pangu_web_cache.h index 6d9e038..c910922 100644 --- a/plugin/business/pangu-http/pangu_web_cache.h +++ b/plugin/business/pangu-http/pangu_web_cache.h @@ -12,7 +12,7 @@ enum cache_query_status WEB_CACHE_HIT }; struct cache_handle; -struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, void *logger); +struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, struct event_base* gc_evbase, void *logger); struct cached_meta { size_t content_length;