diff --git a/common/src/tfe_future.cpp b/common/src/tfe_future.cpp index 00488fd..3e3fc9f 100644 --- a/common/src/tfe_future.cpp +++ b/common/src/tfe_future.cpp @@ -47,7 +47,7 @@ struct promise { struct future f; void * ctx; - char has_timeout; + char has_timeout; char ref_cnt; char may_success_many_times; promise_ctx_destroy_cb * cb_ctx_destroy; @@ -82,7 +82,7 @@ void future_promise_library_init(const char* profile) } } if(g_FP_instance.no_stats) - { + { g_is_FP_init=1; return; } @@ -101,7 +101,7 @@ void future_promise_library_init(const char* profile) g_FP_instance.name_table=htable; screen_stat_handle_t fs=NULL; - const char* stat_path="./future.fieldstat"; + const char* stat_path="log/future.fs2"; const char* app_name="FP"; fs=FS_create_handle(); FS_set_para(fs, APP_NAME, app_name, strlen(app_name)+1); @@ -135,7 +135,7 @@ static void __promise_destroy(struct promise *p) if (p->cb_ctx_destroy != NULL) { p->cb_ctx_destroy(p->ctx); - } + } if(!g_FP_instance.no_stats) FS_operate(g_FP_instance.fs_handle,g_FP_instance.fsid_f_num, 0, FS_OP_SUB, 1); memset(p, 0, sizeof(struct promise)); free(p); diff --git a/conf/pangu/pangu_pxy.conf b/conf/pangu/pangu_pxy.conf index edbe3cc..9ec6482 100644 --- a/conf/pangu/pangu_pxy.conf +++ b/conf/pangu/pangu_pxy.conf @@ -33,16 +33,28 @@ cache_store_object_way=2 redis_cache_object_size=1024000 #If CACHE_STORE_OBJECT_WAY is not 0, we will use redis to store meta and object. redis_cluster_addrs=10.4.20.211:9001,10.4.20.212:9001,10.4.20.213:9001,10.4.20.214:9001,10.4.20.215:9001,10.4.20.216:9001,10.4.20.217:9001,10.4.20.218:9001 + #Configs of WiredLB for Minios load balancer. -#WIREDLB_OVERRIDE=1 +wiredlb_override=1 wiredlb_health_port=42310 +wiredlb_topic=MinioFileLog +wiredlb_datacenter=k18consul-tse +wiredlb_health_port=52102 +wiredlb_group=FileLog + +log_fsstat_appname=tango_log_file +log_fsstat_filepath=./log/tango_log_file.fs2 +log_fsstat_interval=10 +log_fsstat_trig=1 +log_fsstat_dst_ip=10.4.20.202 +log_fsstat_dst_port=8125 [maat] # 0:json 1: redis 2: iris maat_input_mode=1 table_info=resource/pangu/table_info.conf json_cfg_file=resource/ -stat_file=log/pangu_scan.status +stat_file=log/pangu_scan.fs2 full_cfg_dir=pangu_policy/full/index/ inc_cfg_dir=pangu_policy/inc/index/ maat_redis_server=10.4.34.4 diff --git a/platform/include/internal/platform.h b/platform/include/internal/platform.h index eb0a222..35f6647 100644 --- a/platform/include/internal/platform.h +++ b/platform/include/internal/platform.h @@ -91,6 +91,7 @@ struct tfe_stream_private struct ssl_stream * ssl_upstream; }; + uint8_t is_first_call_rxcb; uint8_t is_plugin_opened; int calling_idx; diff --git a/platform/include/internal/proxy.h b/platform/include/internal/proxy.h index 7ce9d79..b575741 100644 --- a/platform/include/internal/proxy.h +++ b/platform/include/internal/proxy.h @@ -17,10 +17,7 @@ enum TFE_STAT_FIELD /* FDs */ STAT_FD_OPEN_BY_KNI_ACCEPT, STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL, - /* FDs */ - STAT_FD_INSTANT_CLOSE, - STAT_FD_DEFER_CLOSE_IN_QUEUE, - STAT_FD_DEFER_CLOSE_SUCCESS, + STAT_FD_CLOSE, /* Stream */ STAT_STREAM_OPEN, @@ -31,14 +28,16 @@ enum TFE_STAT_FIELD STAT_STREAM_CLS_UP_ERR, STAT_STREAM_CLS_KILL, - /* Stream Protocol */ + /* Action */ + STAT_STREAM_INTERCEPT, + STAT_STREAM_BYPASS, + STAT_STREAM_INCPT_BYTES, + STAT_STREAM_INCPT_DOWN_BYTES, + STAT_STREAM_INCPT_UP_BYTES, + + /* Protocol */ STAT_STREAM_TCP_PLAIN, STAT_STREAM_TCP_SSL, - - /* RX DATA */ - STAT_STREAM_DOWN_RX_BYTES, - STAT_STREAM_UP_RX_BYTES, - TFE_STAT_MAX }; diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index 90a653b..c98f334 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -307,9 +307,7 @@ static const char * __str_stat_spec_map[] = [STAT_SIGPIPE] = "SIGPIPE", [STAT_FD_OPEN_BY_KNI_ACCEPT] = "fd_rx", [STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL] = "fd_rx_err", - [STAT_FD_INSTANT_CLOSE] = "fd_inst_cls", - [STAT_FD_DEFER_CLOSE_IN_QUEUE] = "fd_dfr_cls", - [STAT_FD_DEFER_CLOSE_SUCCESS] = "fd_dfr_clsd", + [STAT_FD_CLOSE] = "fd_inst_cls", [STAT_STREAM_OPEN] = "stm_open", [STAT_STREAM_CLS] = "stm_cls", [STAT_STREAM_CLS_DOWN_EOF] = "dstm_eof", @@ -317,16 +315,19 @@ static const char * __str_stat_spec_map[] = [STAT_STREAM_CLS_DOWN_ERR] = "dstm_err", [STAT_STREAM_CLS_UP_ERR] = "ustm_err", [STAT_STREAM_CLS_KILL] = "stm_kill", + [STAT_STREAM_INTERCEPT] = "stm_incpt", + [STAT_STREAM_BYPASS] = "stm_byp", + [STAT_STREAM_INCPT_BYTES] = "stm_incpt_B", + [STAT_STREAM_INCPT_DOWN_BYTES] = "dstm_incpt_B", + [STAT_STREAM_INCPT_UP_BYTES] = "ustm_incpt_B", [STAT_STREAM_TCP_PLAIN] = "plain", - [STAT_STREAM_TCP_SSL] = "SSL", - [STAT_STREAM_DOWN_RX_BYTES] = "dstm_bytes", - [STAT_STREAM_UP_RX_BYTES] = "ustm_bytes", + [STAT_STREAM_TCP_SSL] = "ssl", [TFE_STAT_MAX] = NULL }; int tfe_stat_init(struct tfe_proxy * proxy, const char * profile) { - static const char * fieldstat_output = "./tfe.fieldstat"; + static const char * fieldstat_output = "log/tfe.fs2"; static const char * app_name = "tfe3a"; int value = 0, i = 0; diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp index 2f72c69..70429ac 100644 --- a/platform/src/ssl_stream.cpp +++ b/platform/src/ssl_stream.cpp @@ -609,7 +609,7 @@ void ssl_manager_destroy(struct ssl_mgr * mgr) } -struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section, +struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section, struct event_base * ev_base_gc, struct key_keeper * key_keeper, void * logger) { unsigned int stek_group_num = 0; @@ -705,7 +705,7 @@ struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section mgr->svc_fail_as_proto_err_cnt, mgr->svc_succ_as_app_not_pinning_cnt, mgr->svc_cnt_time_window); - + mgr->key_keeper = key_keeper; MESA_load_profile_uint_def(ini_profile, section, "trusted_cert_load_local", &(mgr->trusted_cert_load_local), 1); @@ -1971,145 +1971,6 @@ void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct return; } -/* - * Cleanly shut down an SSL socket. Libevent currently has no support for - * cleanly shutting down an SSL socket so we work around that by using a - * low-level event. This works for recent versions of OpenSSL. OpenSSL - * with the older SSL_shutdown() semantics, not exposing WANT_READ/WRITE - * may or may not work. - */ -UNUSED static struct ssl_shutdown_ctx * ssl_shutdown_ctx_new(struct ssl_stream * s_stream, struct event_base * evbase) -{ - struct ssl_shutdown_ctx * ctx = ALLOC(struct ssl_shutdown_ctx, 1); - ctx->evbase = evbase; - ctx->s_stream = s_stream; - ctx->ev = NULL; - ctx->mgr = s_stream->mgr; - ctx->dir = s_stream->dir; - ctx->retries = 0; - ctx->dir==CONN_DIR_DOWNSTREAM ? ATOMIC_INC(&(ctx->mgr->stat_val[SSL_DOWN_CLOSING])) - : ATOMIC_INC(&(ctx->mgr->stat_val[SSL_UP_CLOSING])); - - return ctx; -} - -static void ssl_shutdown_ctx_free(struct ssl_shutdown_ctx * ctx) -{ - ctx->dir==CONN_DIR_DOWNSTREAM ? ATOMIC_DEC(&(ctx->mgr->stat_val[SSL_DOWN_CLOSING])) - : ATOMIC_DEC(&(ctx->mgr->stat_val[SSL_UP_CLOSING])); - memset(ctx, 0, sizeof(struct ssl_shutdown_ctx)); - free(ctx); -} - -/* - * The shutdown socket event handler. This is either - * scheduled as a timeout-only event, or as a fd read or - * fd write event, depending on whether SSL_shutdown() - * indicates it needs read or write on the socket. - */ -static void pxy_ssl_shutdown_cb(evutil_socket_t fd, short what, void * arg) -{ - struct ssl_shutdown_ctx * ctx = (struct ssl_shutdown_ctx *) arg; - struct timeval retry_delay = {0, 100}; - - void * logger = ctx->s_stream->mgr->logger; - struct ssl_mgr* mgr=ctx->s_stream->mgr; - short want = 0; - int rv = 0, sslerr = 0; - if (ctx->ev) - { - event_free(ctx->ev); - ctx->ev = NULL; - } - - if(what == 0) - { - TFE_PROXY_STAT_INCREASE(STAT_FD_DEFER_CLOSE_IN_QUEUE, 1); - } - - /* - * Use the new (post-2008) semantics for SSL_shutdown() on a - * non-blocking socket. SSL_shutdown() returns -1 and WANT_READ - * if the other end's close notify was not received yet, and - * WANT_WRITE it could not write our own close notify. - * - * This is a good collection of recent and relevant documents: - * http://bugs.python.org/issue8108 - */ - if(what == EV_TIMEOUT) - { - SSL_set_shutdown(ctx->s_stream->ssl, SSL_RECEIVED_SHUTDOWN); - } - rv = SSL_shutdown(ctx->s_stream->ssl); - - if (rv == 1) - goto complete; - - if (rv != -1) - { - goto retry; - } - - switch ((sslerr = SSL_get_error(ctx->s_stream->ssl, rv))) - { - case SSL_ERROR_WANT_READ: want = EV_READ; - goto retry; - case SSL_ERROR_WANT_WRITE: want = EV_WRITE; - goto retry; - case SSL_ERROR_ZERO_RETURN: - case SSL_ERROR_SYSCALL: - case SSL_ERROR_SSL: goto complete; - default: TFE_LOG_ERROR(logger, "Unhandled SSL_shutdown() " - "error %i. Closing fd.\n", sslerr); - goto complete; - } - - goto complete; - -retry: - if (ctx->retries++ >= MAX_NET_RETRIES) - { - /* - struct tfe_stream_addr* addr=tfe_stream_addr_create_by_fd(fd, ctx->s_stream->dir); - char* addr_string=tfe_stream_addr_to_str(addr); - TFE_LOG_ERROR(logger, "Failed to shutdown %s SSL connection cleanly: %s " - "Max retries reached. Closing fd %d.", - tfe_stream_conn_dir_to_str(ctx->s_stream->dir), - addr_string, fd); - tfe_stream_addr_free(addr); - free(addr_string); - */ - if(ctx->s_stream->dir==CONN_DIR_DOWNSTREAM) - { - ATOMIC_INC(&(mgr->stat_val[SSL_DOWN_DIRTY_CLOSED])); - } - else - { - ATOMIC_INC(&(mgr->stat_val[SSL_UP_DIRTY_CLOSED])); - } - goto complete; - } - - ctx->ev = event_new(ctx->evbase, fd, want, pxy_ssl_shutdown_cb, ctx); - - if (ctx->ev) - { - event_add(ctx->ev, &retry_delay); - } - else - { - TFE_LOG_ERROR(logger, "Failed to shutdown SSL connection cleanly: " - "Cannot create event. Closing fd %d.", fd); - } - return; - -complete: - TFE_PROXY_STAT_INCREASE(STAT_FD_DEFER_CLOSE_SUCCESS, 1); - ssl_stream_free(ctx->s_stream); - evutil_closesocket(fd); - ssl_shutdown_ctx_free(ctx); -} - /* * Cleanly shutdown an SSL session on file descriptor fd using low-level * file descriptor readiness events on event base evbase. diff --git a/platform/src/tcp_stream.cpp b/platform/src/tcp_stream.cpp index f031f1b..be21d03 100644 --- a/platform/src/tcp_stream.cpp +++ b/platform/src/tcp_stream.cpp @@ -422,7 +422,7 @@ static void __conn_private_destory(struct tfe_conn_private * conn) free(conn); (void)ret; - TFE_PROXY_STAT_INCREASE(STAT_FD_INSTANT_CLOSE, 1); + TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE, 1); } static void __conn_private_destory_with_ssl(struct event_base * ev_base, @@ -445,6 +445,12 @@ static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg return; } + if (_stream->is_first_call_rxcb == 0) + { + TFE_PROXY_STAT_INCREASE(STAT_STREAM_BYPASS, 1); + _stream->is_first_call_rxcb = 1; + } + struct evbuffer * __output_buffer = bufferevent_get_output(peer_conn->bev); evbuffer_add_buffer(__output_buffer, __input_buffer); } @@ -589,6 +595,12 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) return; } + if (_stream->is_first_call_rxcb == 0) + { + TFE_PROXY_STAT_INCREASE(STAT_STREAM_INTERCEPT, 1); + _stream->is_first_call_rxcb = 1; + } + struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); assert(inbuf != NULL && outbuf != NULL); @@ -673,15 +685,18 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg) if (dir == CONN_DIR_DOWNSTREAM) { - TFE_PROXY_STAT_INCREASE(STAT_STREAM_DOWN_RX_BYTES, rx_offset_increase); + TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_DOWN_BYTES, rx_offset_increase); _stream->downstream_rx_offset += rx_offset_increase; } else { - TFE_PROXY_STAT_INCREASE(STAT_STREAM_UP_RX_BYTES, rx_offset_increase); + TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_UP_BYTES, rx_offset_increase); _stream->upstream_rx_offset += rx_offset_increase; } + /* Total Bytes */ + TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_BYTES, rx_offset_increase); + if(_stream->need_to_be_kill) { const static struct linger sl{.l_onoff = 1, .l_linger = 0}; @@ -997,18 +1012,18 @@ void ssl_upstream_create_on_success(future_result_t * result, void * user) enum ssl_stream_action ssl_action = ssl_upstream_create_result_release_action(result); if (SSL_ACTION_PASSTHROUGH == ssl_action) { + _stream->tcp_passthough = true; _stream->conn_upstream = __conn_private_create_by_fd(_stream, _stream->defer_fd_upstream); _stream->conn_downstream = __conn_private_create_by_fd(_stream, _stream->defer_fd_downstream); __conn_private_enable(_stream->conn_downstream); __conn_private_enable(_stream->conn_upstream); - _stream->tcp_passthough = 1; _stream->defer_fd_downstream = 0; _stream->defer_fd_upstream = 0; } else if (SSL_ACTION_SHUTDOWN == ssl_action) { - tfe_stream_destory(_stream); + return tfe_stream_destory(_stream); } else { @@ -1232,7 +1247,8 @@ void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket struct tfe_proxy_tcp_options * tcp_options = &_stream->proxy_ref->tcp_options; /* Make it non-blocking */ - evutil_make_socket_nonblocking(fd); + int ret = evutil_make_socket_nonblocking(fd); + assert(ret >= 0); /* Recv Buffer */ if (tcp_options->sz_rcv_buffer >= 0) @@ -1307,6 +1323,8 @@ void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket TFE_LOG_ERROR(g_default_logger, "%s: Failed at setup FD's ttl option, ttl = %d, fd = %d", stream->str_stream_info, __ttl, fd); } + + (void)ret; } int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) @@ -1325,8 +1343,7 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst if (unlikely(_stream->head.addr == NULL)) { TFE_LOG_ERROR(_stream->stream_logger, "Failed to create address from fd %d, %d, terminate fds.", - fd_downstream, fd_upstream); - goto __errout; + fd_downstream, fd_upstream); goto __errout; } _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); diff --git a/plugin/business/pangu-http/src/pangu_web_cache.cpp b/plugin/business/pangu-http/src/pangu_web_cache.cpp index 3ec55bd..ab22e07 100644 --- a/plugin/business/pangu-http/src/pangu_web_cache.cpp +++ b/plugin/business/pangu-http/src/pangu_web_cache.cpp @@ -13,7 +13,7 @@ #include #include -extern "C" +extern "C" { #include } @@ -30,7 +30,7 @@ enum cache_stat_field STAT_CACHE_READ_HIT, STAT_CACHE_READ_BYTES, STAT_CACHE_OVERRIDE_READ, - STAT_CACHE_OVERRIDE_READ_HIT, + STAT_CACHE_OVERRIDE_READ_HIT, STAT_CACHE_OVERRIDE_READ_BYTES, STAT_CACHE_READ_ERR, STAT_CACHE_READ_THROTTLE, @@ -56,7 +56,7 @@ enum cache_stat_field STAT_CACHE_OVERRIDE_WRITE_OBJ_SIZE, __CACHE_STAT_MAX }; - + struct cache_key_descr { int is_not_empty; @@ -68,7 +68,7 @@ struct cache_param { int ref_cnt; struct cache_key_descr key_descr; - + char no_revalidate; char cache_dyn_url; char cache_html; @@ -76,7 +76,7 @@ struct cache_param char ignore_req_nocache; char ignore_res_nocache; char force_caching; - + int min_use; time_t pinning_time_sec; time_t inactive_time_sec; @@ -109,9 +109,9 @@ struct cache_handle long long stat_val[__CACHE_STAT_MAX]; int fs_id[__CACHE_STAT_MAX]; struct event_base* gc_evbase; - struct event* gcev; + struct event* gcev; + - int cache_policy_enabled; //otherwise use default cache policy struct cache_param default_cache_policy; Maat_feather_t ref_feather; @@ -184,7 +184,7 @@ static void web_cache_stat_cb(evutil_socket_t fd, short what, void * arg) //translate bytes to mega bytes. FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))/(1024*1024)); break; - default: + default: FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))); break; } @@ -194,12 +194,12 @@ static void web_cache_stat_cb(evutil_socket_t fd, short what, void * arg) FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ], 0, FS_OP_SET, client_stat_sum.get_recv_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_HIT], 0, FS_OP_SET, client_stat_sum.get_succ_http+client_stat_sum.get_succ_redis); - FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_ERR], 0, + FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_ERR], 0, FS_OP_SET, client_stat_sum.get_err_http+client_stat_sum.get_err_redis); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_CNT], 0, FS_OP_SET, client_stat_sum.put_recv_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_ERR], 0, - FS_OP_SET, client_stat_sum.put_err_http+client_stat_sum.put_err_redis); + FS_OP_SET, client_stat_sum.put_err_http+client_stat_sum.put_err_redis); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_MEMORY], 0, FS_OP_SET, client_stat_sum.memory_used/(1024*1024)); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_ACTIVE_SESSION_HTTP], 0, FS_OP_SET, client_stat_sum.session_http); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_THROTTLE], 0, FS_OP_SET, client_stat_sum.totaldrop_num); @@ -220,12 +220,12 @@ static void set_stat_spec(struct cache_stat_sepc* spec, const char* name, enum f spec->calc_type=calc_type; return; } -void cache_stat_init(struct cache_handle* cache, +void cache_stat_init(struct cache_handle* cache, const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins) { - const char* fieldstat_output="./cache.fieldstat"; + const char* fieldstat_output="log/cache.fs2"; const char* app_name="tfe_cache"; - + int value=0, i=0; screen_stat_handle_t fs_handle=NULL; fs_handle=FS_create_handle(); @@ -245,7 +245,7 @@ const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins) cache->fs_handle=fs_handle; struct cache_stat_sepc spec[__CACHE_STAT_MAX]; - + set_stat_spec(&spec[STAT_CACHE_READ], "cache_read",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_PEND_FORBIDDEN], "read_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_READ_VERIFY], "read_verify",FS_STYLE_FIELD, FS_CALC_CURRENT); @@ -279,7 +279,7 @@ const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins) set_stat_spec(&spec[STAT_CACHE_WRITE_OBJ_SIZE], "wr_obj_sz(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_WRITE_OBJ_SIZE], "or_obj_sz(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); - + for(i=0;i<__CACHE_STAT_MAX;i++) { if(spec[i].style==FS_STYLE_HISTOGRAM) @@ -295,8 +295,8 @@ const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins) // FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value)); FS_register_ratio(cache->fs_handle, - cache->fs_id[STAT_CACHE_READ_HIT], - cache->fs_id[STAT_CACHE_READ], + cache->fs_id[STAT_CACHE_READ_HIT], + cache->fs_id[STAT_CACHE_READ], 1, FS_STYLE_STATUS, FS_CALC_CURRENT, @@ -306,19 +306,19 @@ const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins) FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value)); FS_register_ratio(cache->fs_handle, - cache->fs_id[STAT_CACHE_OVERRIDE_READ_HIT], - cache->fs_id[STAT_CACHE_OVERRIDE_READ], + cache->fs_id[STAT_CACHE_OVERRIDE_READ_HIT], + cache->fs_id[STAT_CACHE_OVERRIDE_READ], 1, FS_STYLE_STATUS, FS_CALC_CURRENT, "or_hit"); - + FS_start(cache->fs_handle); - + struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here. - cache->gcev = event_new(cache->gc_evbase, -1, EV_PERSIST, web_cache_stat_cb, cache); - evtimer_add(cache->gcev, &gc_delay); + cache->gcev = event_new(cache->gc_evbase, -1, EV_PERSIST, web_cache_stat_cb, cache); + evtimer_add(cache->gcev, &gc_delay); return; } @@ -447,7 +447,7 @@ char* url_remove_qs(const char* url, int qs_num, char* ignore_qs[]) char* target_url=ALLOC(char, target_size); int i=0, shall_ignore=0; char *token=NULL,*sub_token=NULL,*saveptr; - + char* query_string=NULL; query_string=strchr(url_copy, '?'); if(query_string!=NULL) @@ -492,10 +492,10 @@ char* get_cache_key(const struct tfe_http_half * request, const struct cache_key char* url_no_qs=NULL; const char* cookie=NULL; char cookie_val[1024]={0}; //most 1024 bytes for cookie key - + size_t key_size=strlen(request->req_spec.url)+sizeof(cookie_val); char* cache_key=ALLOC(char, key_size); - + if(desc->qs_num>0) { url_no_qs=url_remove_qs(request->req_spec.url, desc->qs_num, desc->ignore_qs); @@ -517,7 +517,7 @@ char* get_cache_key(const struct tfe_http_half * request, const struct cache_key return cache_key; } -void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_def_large, +void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_def_large, MAAT_RULE_EX_DATA* ad, long argl, void *argp) { struct cache_handle* cache=(struct cache_handle*) argp; @@ -536,7 +536,7 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de return; } struct cache_param* param=ALLOC(struct cache_param, 1); - + *param=cache->default_cache_policy; param->ref_cnt=1; pthread_mutex_init(&(param->lock), NULL); @@ -545,7 +545,7 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de { qs=cJSON_GetObjectItem(key_desc,"ignore_qs"); if(qs && qs->type==cJSON_Array) - { + { param->key_descr.qs_num=cJSON_GetArraySize(qs); param->key_descr.ignore_qs=ALLOC(char*, param->key_descr.qs_num); for(i=0; ikey_descr.qs_num; i++) @@ -556,12 +556,12 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de strncat(param->key_descr.ignore_qs[i], item->valuestring, len); strncat(param->key_descr.ignore_qs[i], "=", len); } - } + } item=cJSON_GetObjectItem(key_desc,"cookie"); if(item && item->type==cJSON_String) { - param->key_descr.include_cookie=tfe_strdup(item->valuestring); - + param->key_descr.include_cookie=tfe_strdup(item->valuestring); + } if(param->key_descr.qs_num>0||param->key_descr.include_cookie!=NULL) { @@ -570,31 +570,31 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de } - + item=cJSON_GetObjectItem(json,"no_revalidate"); if(item && item->type==cJSON_Number) param->no_revalidate=item->valueint; - + item=cJSON_GetObjectItem(json,"cache_dyn_url"); if(item && item->type==cJSON_Number) param->cache_dyn_url=item->valueint; - + item=cJSON_GetObjectItem(json,"cache_cookied_cont"); if(item && item->type==cJSON_Number) param->cache_cookied_cont=item->valueint; - + item=cJSON_GetObjectItem(json,"ignore_req_nocache"); if(item && item->type==cJSON_Number) param->ignore_req_nocache=item->valueint; - - item=cJSON_GetObjectItem(json,"ignore_res_nocache"); + + item=cJSON_GetObjectItem(json,"ignore_res_nocache"); if(item && item->type==cJSON_Number) param->ignore_res_nocache=item->valueint; - - item=cJSON_GetObjectItem(json,"force_caching"); + + item=cJSON_GetObjectItem(json,"force_caching"); if(item && item->type==cJSON_Number) param->force_caching=item->valueint; - item=cJSON_GetObjectItem(json,"min_use"); + item=cJSON_GetObjectItem(json,"min_use"); if(item && item->type==cJSON_Number) param->min_use=item->valueint; - + item=cJSON_GetObjectItem(json,"pinning_time"); if(item && item->type==cJSON_String) param->pinning_time_sec=time_unit_sec(item->valuestring); - + item=cJSON_GetObjectItem(json,"inactive_time"); if(item && item->type==cJSON_String) param->inactive_time_sec=time_unit_sec(item->valuestring); @@ -603,10 +603,10 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de item=cJSON_GetObjectItem(json,"max_cache_obj_size"); if(item && item->type==cJSON_String) param->max_cache_obj_size=storage_unit_byte(item->valuestring); - + item=cJSON_GetObjectItem(json,"min_cache_obj_size"); if(item && item->type==cJSON_String) param->min_cache_obj_size=storage_unit_byte(item->valuestring); - + cJSON_Delete(json); *ad=param; return; @@ -622,7 +622,7 @@ void cache_param_free(int idx, const struct Maat_rule_t* rule, const char* srv_d pthread_mutex_lock(&(param->lock)); param->ref_cnt--; if(param->ref_cnt>0) - { + { pthread_mutex_unlock(&(param->lock)); return; } @@ -688,11 +688,11 @@ static void cache_key_bloom_gc_cb(evutil_socket_t fd, short what, void * arg) return; } -struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, +struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, struct event_base* gc_evbase, Maat_feather_t feather, void *logger) { struct cache_handle* cache=ALLOC(struct cache_handle, 1); - int temp=0; + int temp=0; struct event* ev=NULL; char statsd_server_ip[TFE_SYMBOL_MAX]={0}; char histogram_bins[TFE_SYMBOL_MAX]={0}; @@ -702,18 +702,18 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha cache->thread_count=tfe_proxy_get_work_thread_count(); cache->clients=ALLOC(struct tango_cache_instance *, cache->thread_count); cache->cache_key_bloom=ALLOC(struct cache_bloom, cache->thread_count); - struct cache_bloom* p_bloom=NULL; - MESA_load_profile_int_def(profile_path, section, "cache_policy_enabled", + struct cache_bloom* p_bloom=NULL; + MESA_load_profile_int_def(profile_path, section, "cache_policy_enabled", &(cache->cache_policy_enabled), 1); - - MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_size", + + MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_size", (int*)&(cache->cache_key_bloom_size), 16*1000*1000); - MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_life", + MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_life", &(cache->cache_key_bloom_life), 30*60); struct timeval gc_refresh_delay = {cache->cache_key_bloom_life, 0}; unsigned int i=0; - + struct tango_cache_parameter *cache_client_param=tango_cache_parameter_new(profile_path, section, logger); for(i=0; ithread_count; i++) { @@ -729,17 +729,17 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha { goto error_out; } - ev = event_new(tfe_proxy_get_work_thread_evbase(i), -1, EV_PERSIST, cache_key_bloom_gc_cb, p_bloom); + ev = event_new(tfe_proxy_get_work_thread_evbase(i), -1, EV_PERSIST, cache_key_bloom_gc_cb, p_bloom); evtimer_add(ev, &gc_refresh_delay); } - + cache->clients[i]=tango_cache_instance_new(cache_client_param,tfe_proxy_get_work_thread_evbase(i), logger); if(cache->clients[i]==NULL) { goto error_out; } } - + MESA_load_profile_int_def(profile_path, section, "get_concurrency_max", &temp, 1000*1000); cache->get_concurrency_max=temp; MESA_load_profile_int_def(profile_path, section, "put_concurrency_max", &(temp), 1000*1000); @@ -749,7 +749,7 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha MESA_load_profile_int_def(profile_path, section, "cached_undefined_obj_minimum_size", &(temp), 100*1024); cache->cache_undefined_obj_min_size=temp; - cache->gc_evbase=gc_evbase; + cache->gc_evbase=gc_evbase; cache->default_cache_policy.key_descr.qs_num=0; cache->default_cache_policy.no_revalidate=0; @@ -762,7 +762,7 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha cache->default_cache_policy.inactive_time_sec=0; cache->default_cache_policy.max_cache_size=0; - MESA_load_profile_int_def(profile_path, section, "min_use", &(cache->default_cache_policy.min_use), 0); + MESA_load_profile_int_def(profile_path, section, "min_use", &(cache->default_cache_policy.min_use), 0); MESA_load_profile_int_def(profile_path, section, "max_cache_obj_size", &(temp), 1024*1024*1024); cache->default_cache_policy.max_cache_obj_size=temp; //<1GB by default @@ -773,8 +773,8 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha { cache->table_url_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_URL"); cache->table_cookie_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_COOKIE"); - - cache->cache_param_idx=Maat_rule_get_ex_new_index(feather, "PXY_CACHE_COMPILE", + + cache->cache_param_idx=Maat_rule_get_ex_new_index(feather, "PXY_CACHE_COMPILE", cache_param_new, cache_param_free, cache_param_dup, 0, cache); cache->ref_feather=feather; @@ -794,7 +794,7 @@ error_out: } static char* read_http1_hdr(const char* hdr, const char* field_name) -{ +{ const char *p=NULL, *q=NULL; char* value=NULL; p=strcasestr(hdr, field_name); @@ -823,7 +823,7 @@ struct cache_query_context const struct cache_mid* ref_mid; char* url; struct cached_meta meta; - + struct tango_cache_result* ref_tango_cache_result; struct future* f_tango_cache_fetch; }; @@ -907,16 +907,16 @@ static void cache_query_obj_on_succ(future_result_t * result, void * user) ctx->meta.content_length=ctx->ref_tango_cache_result->tlength; TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query hit: %s", ctx->url); break; - case RESULT_TYPE_USERTAG: + case RESULT_TYPE_USERTAG: cached_meta_set(&ctx->meta, RESULT_TYPE_USERTAG, ctx->ref_tango_cache_result->data_frag, ctx->ref_tango_cache_result->size); break; case RESULT_TYPE_MISS: TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query miss: %s", ctx->url); //NOT break intentionally. case RESULT_TYPE_END: - //last call. - ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_READING])); - promise_dettach_ctx(p); + //last call. + ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_READING])); + promise_dettach_ctx(p); last_call=1; break; case RESULT_TYPE_BODY: @@ -928,7 +928,7 @@ static void cache_query_obj_on_succ(future_result_t * result, void * user) promise_success(p, ctx); if(last_call) { - cache_query_ctx_free_cb(ctx); + cache_query_ctx_free_cb(ctx); promise_finish(p); } return; @@ -939,7 +939,7 @@ static void cache_query_obj_on_fail(enum e_future_error err, const char * what, struct cache_query_context* ctx=(struct cache_query_context*)promise_dettach_ctx(p); promise_failed(p, err, what); promise_finish(p); - ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_READING])); + ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_READING])); cache_query_ctx_free_cb(ctx); return; } @@ -999,7 +999,7 @@ static void cache_read_meta_on_succ(future_result_t * result, void * user) switch(_result->type) { - case RESULT_TYPE_HEADER: + case RESULT_TYPE_HEADER: ctx->cached_obj_meta.content_length=_result->tlength; cached_meta_set(&ctx->cached_obj_meta, RESULT_TYPE_HEADER, _result->data_frag, _result->size); ctx->status=PENDING_RESULT_REVALIDATE; @@ -1018,8 +1018,8 @@ static void cache_read_meta_on_succ(future_result_t * result, void * user) case RESULT_TYPE_END: //last call. ctx->location=_result->location; - ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_PENDING])); - promise_dettach_ctx(p); + ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_PENDING])); + promise_dettach_ctx(p); promise_success(p, ctx); cache_pending_ctx_free_cb(ctx); break; @@ -1032,8 +1032,8 @@ static void cache_read_meta_on_fail(enum e_future_error err, const char * what, { struct promise * p = (struct promise *) user; struct cache_pending_context* ctx=(struct cache_pending_context*)promise_dettach_ctx(p); - promise_failed(p, err, what); - ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_PENDING])); + promise_failed(p, err, what); + ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_PENDING])); cache_pending_ctx_free_cb(ctx); return; } @@ -1042,7 +1042,7 @@ static void cache_read_meta_on_fail(enum e_future_error err, const char * what, #define CACHE_ACTION_BYPASS 0x80 enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_half * request, struct cache_mid** mid, struct future* f_revalidate) -{ +{ enum cache_pending_result result=PENDING_RESULT_FOBIDDEN; struct Maat_rule_t cache_policy; struct cache_param* param=&(handle->default_cache_policy); @@ -1056,14 +1056,14 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u if(cookie) { _mid->has_cookie=1; - } + } _mid->is_dyn_url=is_dynamic_url(request->req_spec.url); if(handle->cache_policy_enabled) { ret=Maat_full_scan_string(handle->ref_feather, handle->table_url_constraint, CHARSET_UTF8, request->req_spec.url, strlen(request->req_spec.url), &cache_policy, NULL, 1, &scan_mid, thread_id); - + if(cookie && ret<=0) { @@ -1075,7 +1075,7 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u if(ret>0) { - + ex_data=Maat_rule_get_ex_data(handle->ref_feather, &cache_policy, handle->cache_param_idx); if(ex_data!=NULL) { @@ -1093,13 +1093,13 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u _mid->cache_key=get_cache_key(request, &(param->key_descr)); } TFE_LOG_DEBUG(handle->logger, "cache policy %d matched: url=%s alt-key=%s", - cache_policy.config_id, + cache_policy.config_id, request->req_spec.url, _mid->cache_key!=NULL?_mid->cache_key:"null"); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_POLICY_MATCH])); } - if(_mid->shall_bypass || + if(_mid->shall_bypass || (!param->force_caching && !param->cache_dyn_url && _mid->is_dyn_url && param->key_descr.qs_num==0) || (!param->force_caching && !param->cache_cookied_cont && _mid->has_cookie)) { @@ -1143,7 +1143,7 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u } else { - result=PENDING_RESULT_REVALIDATE; + result=PENDING_RESULT_REVALIDATE; } break; } @@ -1152,22 +1152,22 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u _mid->result=result; return _mid->result; } - + struct tango_cache_meta_get meta; memset(&meta, 0, sizeof(meta)); meta.url=_mid->cache_key!=NULL?_mid->cache_key:request->req_spec.url; meta.get = _mid->req_fresshness; - + struct promise* p=future_to_promise(f_revalidate); struct cache_pending_context* ctx=ALLOC(struct cache_pending_context, 1); - ctx->status=PENDING_RESULT_FOBIDDEN; + ctx->status=PENDING_RESULT_FOBIDDEN; ctx->ref_handle=handle; ctx->url=tfe_strdup(request->req_spec.url); ctx->req_if_modified_since=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_MODIFIED_SINCE)); ctx->req_if_none_match=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_NONE_MATCH)); promise_set_ctx(p, ctx, cache_pending_ctx_free_cb); - ATOMIC_INC(&(handle->stat_val[STAT_CACHE_PENDING])); + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_PENDING])); ctx->f_tango_cache_fetch=future_create("_cache_pend", cache_read_meta_on_succ, cache_read_meta_on_fail, p); ret=tango_cache_head_object(handle->clients[thread_id], ctx->f_tango_cache_fetch, &meta); if(ret<0) @@ -1178,7 +1178,7 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u return _mid->result; } _mid->result=PENDING_RESULT_REVALIDATE; - + return _mid->result; } int web_cache_async_read(struct cache_handle* handle, unsigned int thread_id, @@ -1188,10 +1188,10 @@ int web_cache_async_read(struct cache_handle* handle, unsigned int thread_id, struct promise* p=NULL; struct cache_mid* _mid=*mid; assert(_mid->result!=PENDING_RESULT_FOBIDDEN); - + if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_READING])) > ATOMIC_READ(&(handle->put_concurrency_max))) - { - ATOMIC_INC(&(handle->stat_val[STAT_CACHE_READ_THROTTLE])); + { + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_READ_THROTTLE])); return -1; } @@ -1204,11 +1204,11 @@ int web_cache_async_read(struct cache_handle* handle, unsigned int thread_id, query_ctx->ref_mid=_mid; query_ctx->url=tfe_strdup(request->req_spec.url); - p=future_to_promise(f); + p=future_to_promise(f); promise_allow_many_successes(p); promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb); - - ATOMIC_INC(&(handle->stat_val[STAT_CACHE_READING])); + + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_READING])); query_ctx->f_tango_cache_fetch=future_create("_cache_read", cache_query_obj_on_succ, cache_query_obj_on_fail, p); int ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta, _mid->location); if(ret<0) @@ -1247,7 +1247,7 @@ static void wrap_cache_write_on_fail(enum e_future_error err, const char * what, cache_write_future_ctx_free(ctx); } -struct cache_write_context* web_cache_write_start(struct cache_handle* handle, unsigned int thread_id, +struct cache_write_context* web_cache_write_start(struct cache_handle* handle, unsigned int thread_id, const struct tfe_http_session * session, struct cache_mid **mid) { struct cache_write_context* write_ctx=NULL; @@ -1259,9 +1259,9 @@ struct cache_write_context* web_cache_write_start(struct cache_handle* handle, u char *tmp=NULL; int i=0, is_undefined_obj=0; size_t content_len=0; - const struct cache_param* param=NULL; + const struct cache_param* param=NULL; struct cache_mid* _mid=*mid; - + if(_mid!=NULL && _mid->is_using_exception_param) { param=_mid->param; @@ -1286,20 +1286,20 @@ struct cache_write_context* web_cache_write_start(struct cache_handle* handle, u { ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITE_FORBIDEN])); TFE_LOG_DEBUG(handle->logger, "cache write forbiden: %s", session->req->req_spec.url); - return NULL; + return NULL; } break; case REVALIDATE: - case ALLOWED: + case ALLOWED: case UNDEFINED: if(param->force_caching) { break; } - else if(_mid->shall_bypass + else if(_mid->shall_bypass || (param->max_cache_obj_size!=0 && content_len > param->max_cache_obj_size) || (param->min_cache_obj_size > content_len) - || (!param->cache_cookied_cont && _mid->has_cookie) + || (!param->cache_cookied_cont && _mid->has_cookie) || (!param->cache_html && _mid->is_html) ) { @@ -1319,8 +1319,8 @@ struct cache_write_context* web_cache_write_start(struct cache_handle* handle, u break; } if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_WRITING])) > handle->get_concurrency_max) - { - ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITE_THROTTLE])); + { + ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITE_THROTTLE])); return NULL; } const char* key=NULL; @@ -1346,7 +1346,7 @@ struct cache_write_context* web_cache_write_start(struct cache_handle* handle, u } } ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITING])); - + struct tango_cache_meta_put meta; memset(&meta, 0, sizeof(meta)); meta.url=_mid->cache_key?_mid->cache_key:session->req->req_spec.url; @@ -1413,7 +1413,7 @@ int web_cache_write_end(struct cache_write_context* ctx) { //upload too slow or storage server error; TFE_LOG_DEBUG(ctx->ref_cache_handle->logger, "cache upload failed: %s",ctx->future_ctx->url); - cache_write_future_ctx_free(ctx->future_ctx); + cache_write_future_ctx_free(ctx->future_ctx); ctx->future_ctx=NULL; ATOMIC_INC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_WRITE_ERR])); ret=-1;