#167 增加已拦截连接数、字节数、PASSTHROUGH连接数统计并调整FieldStat状态文件的输出位置。

This commit is contained in:
luqiuwen
2019-09-16 16:40:31 +08:00
committed by 陆秋文
parent 8c0f135877
commit f1fd1d0ad3
8 changed files with 166 additions and 275 deletions

View File

@@ -47,7 +47,7 @@ struct promise
{ {
struct future f; struct future f;
void * ctx; void * ctx;
char has_timeout; char has_timeout;
char ref_cnt; char ref_cnt;
char may_success_many_times; char may_success_many_times;
promise_ctx_destroy_cb * cb_ctx_destroy; promise_ctx_destroy_cb * cb_ctx_destroy;
@@ -82,7 +82,7 @@ void future_promise_library_init(const char* profile)
} }
} }
if(g_FP_instance.no_stats) if(g_FP_instance.no_stats)
{ {
g_is_FP_init=1; g_is_FP_init=1;
return; return;
} }
@@ -101,7 +101,7 @@ void future_promise_library_init(const char* profile)
g_FP_instance.name_table=htable; g_FP_instance.name_table=htable;
screen_stat_handle_t fs=NULL; screen_stat_handle_t fs=NULL;
const char* stat_path="./future.fieldstat"; const char* stat_path="log/future.fs2";
const char* app_name="FP"; const char* app_name="FP";
fs=FS_create_handle(); fs=FS_create_handle();
FS_set_para(fs, APP_NAME, app_name, strlen(app_name)+1); FS_set_para(fs, APP_NAME, app_name, strlen(app_name)+1);
@@ -135,7 +135,7 @@ static void __promise_destroy(struct promise *p)
if (p->cb_ctx_destroy != NULL) if (p->cb_ctx_destroy != NULL)
{ {
p->cb_ctx_destroy(p->ctx); p->cb_ctx_destroy(p->ctx);
} }
if(!g_FP_instance.no_stats) FS_operate(g_FP_instance.fs_handle,g_FP_instance.fsid_f_num, 0, FS_OP_SUB, 1); if(!g_FP_instance.no_stats) FS_operate(g_FP_instance.fs_handle,g_FP_instance.fsid_f_num, 0, FS_OP_SUB, 1);
memset(p, 0, sizeof(struct promise)); memset(p, 0, sizeof(struct promise));
free(p); free(p);

View File

@@ -33,16 +33,28 @@ cache_store_object_way=2
redis_cache_object_size=1024000 redis_cache_object_size=1024000
#If CACHE_STORE_OBJECT_WAY is not 0, we will use redis to store meta and object. #If CACHE_STORE_OBJECT_WAY is not 0, we will use redis to store meta and object.
redis_cluster_addrs=10.4.20.211:9001,10.4.20.212:9001,10.4.20.213:9001,10.4.20.214:9001,10.4.20.215:9001,10.4.20.216:9001,10.4.20.217:9001,10.4.20.218:9001 redis_cluster_addrs=10.4.20.211:9001,10.4.20.212:9001,10.4.20.213:9001,10.4.20.214:9001,10.4.20.215:9001,10.4.20.216:9001,10.4.20.217:9001,10.4.20.218:9001
#Configs of WiredLB for Minios load balancer. #Configs of WiredLB for Minios load balancer.
#WIREDLB_OVERRIDE=1 wiredlb_override=1
wiredlb_health_port=42310 wiredlb_health_port=42310
wiredlb_topic=MinioFileLog
wiredlb_datacenter=k18consul-tse
wiredlb_health_port=52102
wiredlb_group=FileLog
log_fsstat_appname=tango_log_file
log_fsstat_filepath=./log/tango_log_file.fs2
log_fsstat_interval=10
log_fsstat_trig=1
log_fsstat_dst_ip=10.4.20.202
log_fsstat_dst_port=8125
[maat] [maat]
# 0:json 1: redis 2: iris # 0:json 1: redis 2: iris
maat_input_mode=1 maat_input_mode=1
table_info=resource/pangu/table_info.conf table_info=resource/pangu/table_info.conf
json_cfg_file=resource/ json_cfg_file=resource/
stat_file=log/pangu_scan.status stat_file=log/pangu_scan.fs2
full_cfg_dir=pangu_policy/full/index/ full_cfg_dir=pangu_policy/full/index/
inc_cfg_dir=pangu_policy/inc/index/ inc_cfg_dir=pangu_policy/inc/index/
maat_redis_server=10.4.34.4 maat_redis_server=10.4.34.4

View File

@@ -91,6 +91,7 @@ struct tfe_stream_private
struct ssl_stream * ssl_upstream; struct ssl_stream * ssl_upstream;
}; };
uint8_t is_first_call_rxcb;
uint8_t is_plugin_opened; uint8_t is_plugin_opened;
int calling_idx; int calling_idx;

View File

@@ -17,10 +17,7 @@ enum TFE_STAT_FIELD
/* FDs */ /* FDs */
STAT_FD_OPEN_BY_KNI_ACCEPT, STAT_FD_OPEN_BY_KNI_ACCEPT,
STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL, STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL,
/* FDs */ STAT_FD_CLOSE,
STAT_FD_INSTANT_CLOSE,
STAT_FD_DEFER_CLOSE_IN_QUEUE,
STAT_FD_DEFER_CLOSE_SUCCESS,
/* Stream */ /* Stream */
STAT_STREAM_OPEN, STAT_STREAM_OPEN,
@@ -31,14 +28,16 @@ enum TFE_STAT_FIELD
STAT_STREAM_CLS_UP_ERR, STAT_STREAM_CLS_UP_ERR,
STAT_STREAM_CLS_KILL, STAT_STREAM_CLS_KILL,
/* Stream Protocol */ /* Action */
STAT_STREAM_INTERCEPT,
STAT_STREAM_BYPASS,
STAT_STREAM_INCPT_BYTES,
STAT_STREAM_INCPT_DOWN_BYTES,
STAT_STREAM_INCPT_UP_BYTES,
/* Protocol */
STAT_STREAM_TCP_PLAIN, STAT_STREAM_TCP_PLAIN,
STAT_STREAM_TCP_SSL, STAT_STREAM_TCP_SSL,
/* RX DATA */
STAT_STREAM_DOWN_RX_BYTES,
STAT_STREAM_UP_RX_BYTES,
TFE_STAT_MAX TFE_STAT_MAX
}; };

View File

@@ -307,9 +307,7 @@ static const char * __str_stat_spec_map[] =
[STAT_SIGPIPE] = "SIGPIPE", [STAT_SIGPIPE] = "SIGPIPE",
[STAT_FD_OPEN_BY_KNI_ACCEPT] = "fd_rx", [STAT_FD_OPEN_BY_KNI_ACCEPT] = "fd_rx",
[STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL] = "fd_rx_err", [STAT_FD_CLOSE_BY_KNI_ACCEPT_FAIL] = "fd_rx_err",
[STAT_FD_INSTANT_CLOSE] = "fd_inst_cls", [STAT_FD_CLOSE] = "fd_inst_cls",
[STAT_FD_DEFER_CLOSE_IN_QUEUE] = "fd_dfr_cls",
[STAT_FD_DEFER_CLOSE_SUCCESS] = "fd_dfr_clsd",
[STAT_STREAM_OPEN] = "stm_open", [STAT_STREAM_OPEN] = "stm_open",
[STAT_STREAM_CLS] = "stm_cls", [STAT_STREAM_CLS] = "stm_cls",
[STAT_STREAM_CLS_DOWN_EOF] = "dstm_eof", [STAT_STREAM_CLS_DOWN_EOF] = "dstm_eof",
@@ -317,16 +315,19 @@ static const char * __str_stat_spec_map[] =
[STAT_STREAM_CLS_DOWN_ERR] = "dstm_err", [STAT_STREAM_CLS_DOWN_ERR] = "dstm_err",
[STAT_STREAM_CLS_UP_ERR] = "ustm_err", [STAT_STREAM_CLS_UP_ERR] = "ustm_err",
[STAT_STREAM_CLS_KILL] = "stm_kill", [STAT_STREAM_CLS_KILL] = "stm_kill",
[STAT_STREAM_INTERCEPT] = "stm_incpt",
[STAT_STREAM_BYPASS] = "stm_byp",
[STAT_STREAM_INCPT_BYTES] = "stm_incpt_B",
[STAT_STREAM_INCPT_DOWN_BYTES] = "dstm_incpt_B",
[STAT_STREAM_INCPT_UP_BYTES] = "ustm_incpt_B",
[STAT_STREAM_TCP_PLAIN] = "plain", [STAT_STREAM_TCP_PLAIN] = "plain",
[STAT_STREAM_TCP_SSL] = "SSL", [STAT_STREAM_TCP_SSL] = "ssl",
[STAT_STREAM_DOWN_RX_BYTES] = "dstm_bytes",
[STAT_STREAM_UP_RX_BYTES] = "ustm_bytes",
[TFE_STAT_MAX] = NULL [TFE_STAT_MAX] = NULL
}; };
int tfe_stat_init(struct tfe_proxy * proxy, const char * profile) int tfe_stat_init(struct tfe_proxy * proxy, const char * profile)
{ {
static const char * fieldstat_output = "./tfe.fieldstat"; static const char * fieldstat_output = "log/tfe.fs2";
static const char * app_name = "tfe3a"; static const char * app_name = "tfe3a";
int value = 0, i = 0; int value = 0, i = 0;

View File

@@ -609,7 +609,7 @@ void ssl_manager_destroy(struct ssl_mgr * mgr)
} }
struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section, struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section,
struct event_base * ev_base_gc, struct key_keeper * key_keeper, void * logger) struct event_base * ev_base_gc, struct key_keeper * key_keeper, void * logger)
{ {
unsigned int stek_group_num = 0; unsigned int stek_group_num = 0;
@@ -705,7 +705,7 @@ struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section
mgr->svc_fail_as_proto_err_cnt, mgr->svc_fail_as_proto_err_cnt,
mgr->svc_succ_as_app_not_pinning_cnt, mgr->svc_succ_as_app_not_pinning_cnt,
mgr->svc_cnt_time_window); mgr->svc_cnt_time_window);
mgr->key_keeper = key_keeper; mgr->key_keeper = key_keeper;
MESA_load_profile_uint_def(ini_profile, section, "trusted_cert_load_local", MESA_load_profile_uint_def(ini_profile, section, "trusted_cert_load_local",
&(mgr->trusted_cert_load_local), 1); &(mgr->trusted_cert_load_local), 1);
@@ -1971,145 +1971,6 @@ void ssl_async_downstream_create(struct future * f, struct ssl_mgr * mgr, struct
return; return;
} }
/*
* Cleanly shut down an SSL socket. Libevent currently has no support for
* cleanly shutting down an SSL socket so we work around that by using a
* low-level event. This works for recent versions of OpenSSL. OpenSSL
* with the older SSL_shutdown() semantics, not exposing WANT_READ/WRITE
* may or may not work.
*/
UNUSED static struct ssl_shutdown_ctx * ssl_shutdown_ctx_new(struct ssl_stream * s_stream, struct event_base * evbase)
{
struct ssl_shutdown_ctx * ctx = ALLOC(struct ssl_shutdown_ctx, 1);
ctx->evbase = evbase;
ctx->s_stream = s_stream;
ctx->ev = NULL;
ctx->mgr = s_stream->mgr;
ctx->dir = s_stream->dir;
ctx->retries = 0;
ctx->dir==CONN_DIR_DOWNSTREAM ? ATOMIC_INC(&(ctx->mgr->stat_val[SSL_DOWN_CLOSING]))
: ATOMIC_INC(&(ctx->mgr->stat_val[SSL_UP_CLOSING]));
return ctx;
}
static void ssl_shutdown_ctx_free(struct ssl_shutdown_ctx * ctx)
{
ctx->dir==CONN_DIR_DOWNSTREAM ? ATOMIC_DEC(&(ctx->mgr->stat_val[SSL_DOWN_CLOSING]))
: ATOMIC_DEC(&(ctx->mgr->stat_val[SSL_UP_CLOSING]));
memset(ctx, 0, sizeof(struct ssl_shutdown_ctx));
free(ctx);
}
/*
* The shutdown socket event handler. This is either
* scheduled as a timeout-only event, or as a fd read or
* fd write event, depending on whether SSL_shutdown()
* indicates it needs read or write on the socket.
*/
static void pxy_ssl_shutdown_cb(evutil_socket_t fd, short what, void * arg)
{
struct ssl_shutdown_ctx * ctx = (struct ssl_shutdown_ctx *) arg;
struct timeval retry_delay = {0, 100};
void * logger = ctx->s_stream->mgr->logger;
struct ssl_mgr* mgr=ctx->s_stream->mgr;
short want = 0;
int rv = 0, sslerr = 0;
if (ctx->ev)
{
event_free(ctx->ev);
ctx->ev = NULL;
}
if(what == 0)
{
TFE_PROXY_STAT_INCREASE(STAT_FD_DEFER_CLOSE_IN_QUEUE, 1);
}
/*
* Use the new (post-2008) semantics for SSL_shutdown() on a
* non-blocking socket. SSL_shutdown() returns -1 and WANT_READ
* if the other end's close notify was not received yet, and
* WANT_WRITE it could not write our own close notify.
*
* This is a good collection of recent and relevant documents:
* http://bugs.python.org/issue8108
*/
if(what == EV_TIMEOUT)
{
SSL_set_shutdown(ctx->s_stream->ssl, SSL_RECEIVED_SHUTDOWN);
}
rv = SSL_shutdown(ctx->s_stream->ssl);
if (rv == 1)
goto complete;
if (rv != -1)
{
goto retry;
}
switch ((sslerr = SSL_get_error(ctx->s_stream->ssl, rv)))
{
case SSL_ERROR_WANT_READ: want = EV_READ;
goto retry;
case SSL_ERROR_WANT_WRITE: want = EV_WRITE;
goto retry;
case SSL_ERROR_ZERO_RETURN:
case SSL_ERROR_SYSCALL:
case SSL_ERROR_SSL: goto complete;
default: TFE_LOG_ERROR(logger, "Unhandled SSL_shutdown() "
"error %i. Closing fd.\n", sslerr);
goto complete;
}
goto complete;
retry:
if (ctx->retries++ >= MAX_NET_RETRIES)
{
/*
struct tfe_stream_addr* addr=tfe_stream_addr_create_by_fd(fd, ctx->s_stream->dir);
char* addr_string=tfe_stream_addr_to_str(addr);
TFE_LOG_ERROR(logger, "Failed to shutdown %s SSL connection cleanly: %s "
"Max retries reached. Closing fd %d.",
tfe_stream_conn_dir_to_str(ctx->s_stream->dir),
addr_string, fd);
tfe_stream_addr_free(addr);
free(addr_string);
*/
if(ctx->s_stream->dir==CONN_DIR_DOWNSTREAM)
{
ATOMIC_INC(&(mgr->stat_val[SSL_DOWN_DIRTY_CLOSED]));
}
else
{
ATOMIC_INC(&(mgr->stat_val[SSL_UP_DIRTY_CLOSED]));
}
goto complete;
}
ctx->ev = event_new(ctx->evbase, fd, want, pxy_ssl_shutdown_cb, ctx);
if (ctx->ev)
{
event_add(ctx->ev, &retry_delay);
}
else
{
TFE_LOG_ERROR(logger, "Failed to shutdown SSL connection cleanly: "
"Cannot create event. Closing fd %d.", fd);
}
return;
complete:
TFE_PROXY_STAT_INCREASE(STAT_FD_DEFER_CLOSE_SUCCESS, 1);
ssl_stream_free(ctx->s_stream);
evutil_closesocket(fd);
ssl_shutdown_ctx_free(ctx);
}
/* /*
* Cleanly shutdown an SSL session on file descriptor fd using low-level * Cleanly shutdown an SSL session on file descriptor fd using low-level
* file descriptor readiness events on event base evbase. * file descriptor readiness events on event base evbase.

View File

@@ -422,7 +422,7 @@ static void __conn_private_destory(struct tfe_conn_private * conn)
free(conn); free(conn);
(void)ret; (void)ret;
TFE_PROXY_STAT_INCREASE(STAT_FD_INSTANT_CLOSE, 1); TFE_PROXY_STAT_INCREASE(STAT_FD_CLOSE, 1);
} }
static void __conn_private_destory_with_ssl(struct event_base * ev_base, static void __conn_private_destory_with_ssl(struct event_base * ev_base,
@@ -445,6 +445,12 @@ static void __stream_bev_passthrough_readcb(struct bufferevent * bev, void * arg
return; return;
} }
if (_stream->is_first_call_rxcb == 0)
{
TFE_PROXY_STAT_INCREASE(STAT_STREAM_BYPASS, 1);
_stream->is_first_call_rxcb = 1;
}
struct evbuffer * __output_buffer = bufferevent_get_output(peer_conn->bev); struct evbuffer * __output_buffer = bufferevent_get_output(peer_conn->bev);
evbuffer_add_buffer(__output_buffer, __input_buffer); evbuffer_add_buffer(__output_buffer, __input_buffer);
} }
@@ -589,6 +595,12 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
return; return;
} }
if (_stream->is_first_call_rxcb == 0)
{
TFE_PROXY_STAT_INCREASE(STAT_STREAM_INTERCEPT, 1);
_stream->is_first_call_rxcb = 1;
}
struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev); struct evbuffer * outbuf = bufferevent_get_output(peer_conn->bev);
assert(inbuf != NULL && outbuf != NULL); assert(inbuf != NULL && outbuf != NULL);
@@ -673,15 +685,18 @@ static void __stream_bev_readcb(struct bufferevent * bev, void * arg)
if (dir == CONN_DIR_DOWNSTREAM) if (dir == CONN_DIR_DOWNSTREAM)
{ {
TFE_PROXY_STAT_INCREASE(STAT_STREAM_DOWN_RX_BYTES, rx_offset_increase); TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_DOWN_BYTES, rx_offset_increase);
_stream->downstream_rx_offset += rx_offset_increase; _stream->downstream_rx_offset += rx_offset_increase;
} }
else else
{ {
TFE_PROXY_STAT_INCREASE(STAT_STREAM_UP_RX_BYTES, rx_offset_increase); TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_UP_BYTES, rx_offset_increase);
_stream->upstream_rx_offset += rx_offset_increase; _stream->upstream_rx_offset += rx_offset_increase;
} }
/* Total Bytes */
TFE_PROXY_STAT_INCREASE(STAT_STREAM_INCPT_BYTES, rx_offset_increase);
if(_stream->need_to_be_kill) if(_stream->need_to_be_kill)
{ {
const static struct linger sl{.l_onoff = 1, .l_linger = 0}; const static struct linger sl{.l_onoff = 1, .l_linger = 0};
@@ -997,18 +1012,18 @@ void ssl_upstream_create_on_success(future_result_t * result, void * user)
enum ssl_stream_action ssl_action = ssl_upstream_create_result_release_action(result); enum ssl_stream_action ssl_action = ssl_upstream_create_result_release_action(result);
if (SSL_ACTION_PASSTHROUGH == ssl_action) if (SSL_ACTION_PASSTHROUGH == ssl_action)
{ {
_stream->tcp_passthough = true;
_stream->conn_upstream = __conn_private_create_by_fd(_stream, _stream->defer_fd_upstream); _stream->conn_upstream = __conn_private_create_by_fd(_stream, _stream->defer_fd_upstream);
_stream->conn_downstream = __conn_private_create_by_fd(_stream, _stream->defer_fd_downstream); _stream->conn_downstream = __conn_private_create_by_fd(_stream, _stream->defer_fd_downstream);
__conn_private_enable(_stream->conn_downstream); __conn_private_enable(_stream->conn_downstream);
__conn_private_enable(_stream->conn_upstream); __conn_private_enable(_stream->conn_upstream);
_stream->tcp_passthough = 1;
_stream->defer_fd_downstream = 0; _stream->defer_fd_downstream = 0;
_stream->defer_fd_upstream = 0; _stream->defer_fd_upstream = 0;
} }
else if (SSL_ACTION_SHUTDOWN == ssl_action) else if (SSL_ACTION_SHUTDOWN == ssl_action)
{ {
tfe_stream_destory(_stream); return tfe_stream_destory(_stream);
} }
else else
{ {
@@ -1232,7 +1247,8 @@ void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket
struct tfe_proxy_tcp_options * tcp_options = &_stream->proxy_ref->tcp_options; struct tfe_proxy_tcp_options * tcp_options = &_stream->proxy_ref->tcp_options;
/* Make it non-blocking */ /* Make it non-blocking */
evutil_make_socket_nonblocking(fd); int ret = evutil_make_socket_nonblocking(fd);
assert(ret >= 0);
/* Recv Buffer */ /* Recv Buffer */
if (tcp_options->sz_rcv_buffer >= 0) if (tcp_options->sz_rcv_buffer >= 0)
@@ -1307,6 +1323,8 @@ void __stream_fd_option_setup(struct tfe_stream_private * _stream, evutil_socket
TFE_LOG_ERROR(g_default_logger, "%s: Failed at setup FD's ttl option, ttl = %d, fd = %d", TFE_LOG_ERROR(g_default_logger, "%s: Failed at setup FD's ttl option, ttl = %d, fd = %d",
stream->str_stream_info, __ttl, fd); stream->str_stream_info, __ttl, fd);
} }
(void)ret;
} }
int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream) int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downstream, evutil_socket_t fd_upstream)
@@ -1325,8 +1343,7 @@ int tfe_stream_init_by_fds(struct tfe_stream * stream, evutil_socket_t fd_downst
if (unlikely(_stream->head.addr == NULL)) if (unlikely(_stream->head.addr == NULL))
{ {
TFE_LOG_ERROR(_stream->stream_logger, "Failed to create address from fd %d, %d, terminate fds.", TFE_LOG_ERROR(_stream->stream_logger, "Failed to create address from fd %d, %d, terminate fds.",
fd_downstream, fd_upstream); fd_downstream, fd_upstream); goto __errout;
goto __errout;
} }
_stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr); _stream->str_stream_addr = tfe_stream_addr_to_str(_stream->head.addr);

View File

@@ -13,7 +13,7 @@
#include <event2/event.h> #include <event2/event.h>
#include <event2/buffer.h> #include <event2/buffer.h>
extern "C" extern "C"
{ {
#include <dablooms.h> #include <dablooms.h>
} }
@@ -30,7 +30,7 @@ enum cache_stat_field
STAT_CACHE_READ_HIT, STAT_CACHE_READ_HIT,
STAT_CACHE_READ_BYTES, STAT_CACHE_READ_BYTES,
STAT_CACHE_OVERRIDE_READ, STAT_CACHE_OVERRIDE_READ,
STAT_CACHE_OVERRIDE_READ_HIT, STAT_CACHE_OVERRIDE_READ_HIT,
STAT_CACHE_OVERRIDE_READ_BYTES, STAT_CACHE_OVERRIDE_READ_BYTES,
STAT_CACHE_READ_ERR, STAT_CACHE_READ_ERR,
STAT_CACHE_READ_THROTTLE, STAT_CACHE_READ_THROTTLE,
@@ -56,7 +56,7 @@ enum cache_stat_field
STAT_CACHE_OVERRIDE_WRITE_OBJ_SIZE, STAT_CACHE_OVERRIDE_WRITE_OBJ_SIZE,
__CACHE_STAT_MAX __CACHE_STAT_MAX
}; };
struct cache_key_descr struct cache_key_descr
{ {
int is_not_empty; int is_not_empty;
@@ -68,7 +68,7 @@ struct cache_param
{ {
int ref_cnt; int ref_cnt;
struct cache_key_descr key_descr; struct cache_key_descr key_descr;
char no_revalidate; char no_revalidate;
char cache_dyn_url; char cache_dyn_url;
char cache_html; char cache_html;
@@ -76,7 +76,7 @@ struct cache_param
char ignore_req_nocache; char ignore_req_nocache;
char ignore_res_nocache; char ignore_res_nocache;
char force_caching; char force_caching;
int min_use; int min_use;
time_t pinning_time_sec; time_t pinning_time_sec;
time_t inactive_time_sec; time_t inactive_time_sec;
@@ -109,9 +109,9 @@ struct cache_handle
long long stat_val[__CACHE_STAT_MAX]; long long stat_val[__CACHE_STAT_MAX];
int fs_id[__CACHE_STAT_MAX]; int fs_id[__CACHE_STAT_MAX];
struct event_base* gc_evbase; struct event_base* gc_evbase;
struct event* gcev; struct event* gcev;
int cache_policy_enabled; //otherwise use default cache policy int cache_policy_enabled; //otherwise use default cache policy
struct cache_param default_cache_policy; struct cache_param default_cache_policy;
Maat_feather_t ref_feather; Maat_feather_t ref_feather;
@@ -184,7 +184,7 @@ static void web_cache_stat_cb(evutil_socket_t fd, short what, void * arg)
//translate bytes to mega bytes. //translate bytes to mega bytes.
FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))/(1024*1024)); FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))/(1024*1024));
break; break;
default: default:
FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i]))); FS_operate(cache->fs_handle, cache->fs_id[i], 0, FS_OP_SET, ATOMIC_READ(&(cache->stat_val[i])));
break; break;
} }
@@ -194,12 +194,12 @@ static void web_cache_stat_cb(evutil_socket_t fd, short what, void * arg)
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ], 0, FS_OP_SET, client_stat_sum.get_recv_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ], 0, FS_OP_SET, client_stat_sum.get_recv_num);
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_HIT], 0, FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_HIT], 0,
FS_OP_SET, client_stat_sum.get_succ_http+client_stat_sum.get_succ_redis); FS_OP_SET, client_stat_sum.get_succ_http+client_stat_sum.get_succ_redis);
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_ERR], 0, FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_READ_ERR], 0,
FS_OP_SET, client_stat_sum.get_err_http+client_stat_sum.get_err_redis); FS_OP_SET, client_stat_sum.get_err_http+client_stat_sum.get_err_redis);
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_CNT], 0, FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_CNT], 0,
FS_OP_SET, client_stat_sum.put_recv_num); FS_OP_SET, client_stat_sum.put_recv_num);
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_ERR], 0, FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_ERR], 0,
FS_OP_SET, client_stat_sum.put_err_http+client_stat_sum.put_err_redis); FS_OP_SET, client_stat_sum.put_err_http+client_stat_sum.put_err_redis);
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_MEMORY], 0, FS_OP_SET, client_stat_sum.memory_used/(1024*1024)); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_MEMORY], 0, FS_OP_SET, client_stat_sum.memory_used/(1024*1024));
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_ACTIVE_SESSION_HTTP], 0, FS_OP_SET, client_stat_sum.session_http); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_ACTIVE_SESSION_HTTP], 0, FS_OP_SET, client_stat_sum.session_http);
FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_THROTTLE], 0, FS_OP_SET, client_stat_sum.totaldrop_num); FS_operate(cache->fs_handle, cache->fs_id[STAT_CACHE_WRITE_THROTTLE], 0, FS_OP_SET, client_stat_sum.totaldrop_num);
@@ -220,12 +220,12 @@ static void set_stat_spec(struct cache_stat_sepc* spec, const char* name, enum f
spec->calc_type=calc_type; spec->calc_type=calc_type;
return; return;
} }
void cache_stat_init(struct cache_handle* cache, void cache_stat_init(struct cache_handle* cache,
const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins) const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins)
{ {
const char* fieldstat_output="./cache.fieldstat"; const char* fieldstat_output="log/cache.fs2";
const char* app_name="tfe_cache"; const char* app_name="tfe_cache";
int value=0, i=0; int value=0, i=0;
screen_stat_handle_t fs_handle=NULL; screen_stat_handle_t fs_handle=NULL;
fs_handle=FS_create_handle(); fs_handle=FS_create_handle();
@@ -245,7 +245,7 @@ const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins)
cache->fs_handle=fs_handle; cache->fs_handle=fs_handle;
struct cache_stat_sepc spec[__CACHE_STAT_MAX]; struct cache_stat_sepc spec[__CACHE_STAT_MAX];
set_stat_spec(&spec[STAT_CACHE_READ], "cache_read",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_READ], "cache_read",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_PEND_FORBIDDEN], "read_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_PEND_FORBIDDEN], "read_forbid",FS_STYLE_FIELD, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_READ_VERIFY], "read_verify",FS_STYLE_FIELD, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_READ_VERIFY], "read_verify",FS_STYLE_FIELD, FS_CALC_CURRENT);
@@ -279,7 +279,7 @@ const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins)
set_stat_spec(&spec[STAT_CACHE_WRITE_OBJ_SIZE], "wr_obj_sz(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_WRITE_OBJ_SIZE], "wr_obj_sz(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
set_stat_spec(&spec[STAT_CACHE_OVERRIDE_WRITE_OBJ_SIZE], "or_obj_sz(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT); set_stat_spec(&spec[STAT_CACHE_OVERRIDE_WRITE_OBJ_SIZE], "or_obj_sz(KB)",FS_STYLE_HISTOGRAM, FS_CALC_CURRENT);
for(i=0;i<__CACHE_STAT_MAX;i++) for(i=0;i<__CACHE_STAT_MAX;i++)
{ {
if(spec[i].style==FS_STYLE_HISTOGRAM) if(spec[i].style==FS_STYLE_HISTOGRAM)
@@ -295,8 +295,8 @@ const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins)
// FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value)); // FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value));
FS_register_ratio(cache->fs_handle, FS_register_ratio(cache->fs_handle,
cache->fs_id[STAT_CACHE_READ_HIT], cache->fs_id[STAT_CACHE_READ_HIT],
cache->fs_id[STAT_CACHE_READ], cache->fs_id[STAT_CACHE_READ],
1, 1,
FS_STYLE_STATUS, FS_STYLE_STATUS,
FS_CALC_CURRENT, FS_CALC_CURRENT,
@@ -306,19 +306,19 @@ const char* statsd_server_ip, int statsd_server_port, const char*histogram_bins)
FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value)); FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value));
FS_register_ratio(cache->fs_handle, FS_register_ratio(cache->fs_handle,
cache->fs_id[STAT_CACHE_OVERRIDE_READ_HIT], cache->fs_id[STAT_CACHE_OVERRIDE_READ_HIT],
cache->fs_id[STAT_CACHE_OVERRIDE_READ], cache->fs_id[STAT_CACHE_OVERRIDE_READ],
1, 1,
FS_STYLE_STATUS, FS_STYLE_STATUS,
FS_CALC_CURRENT, FS_CALC_CURRENT,
"or_hit"); "or_hit");
FS_start(cache->fs_handle); FS_start(cache->fs_handle);
struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here. struct timeval gc_delay = {0, 500*1000}; //Microseconds, we set 500 miliseconds here.
cache->gcev = event_new(cache->gc_evbase, -1, EV_PERSIST, web_cache_stat_cb, cache); cache->gcev = event_new(cache->gc_evbase, -1, EV_PERSIST, web_cache_stat_cb, cache);
evtimer_add(cache->gcev, &gc_delay); evtimer_add(cache->gcev, &gc_delay);
return; return;
} }
@@ -447,7 +447,7 @@ char* url_remove_qs(const char* url, int qs_num, char* ignore_qs[])
char* target_url=ALLOC(char, target_size); char* target_url=ALLOC(char, target_size);
int i=0, shall_ignore=0; int i=0, shall_ignore=0;
char *token=NULL,*sub_token=NULL,*saveptr; char *token=NULL,*sub_token=NULL,*saveptr;
char* query_string=NULL; char* query_string=NULL;
query_string=strchr(url_copy, '?'); query_string=strchr(url_copy, '?');
if(query_string!=NULL) if(query_string!=NULL)
@@ -492,10 +492,10 @@ char* get_cache_key(const struct tfe_http_half * request, const struct cache_key
char* url_no_qs=NULL; char* url_no_qs=NULL;
const char* cookie=NULL; const char* cookie=NULL;
char cookie_val[1024]={0}; //most 1024 bytes for cookie key char cookie_val[1024]={0}; //most 1024 bytes for cookie key
size_t key_size=strlen(request->req_spec.url)+sizeof(cookie_val); size_t key_size=strlen(request->req_spec.url)+sizeof(cookie_val);
char* cache_key=ALLOC(char, key_size); char* cache_key=ALLOC(char, key_size);
if(desc->qs_num>0) if(desc->qs_num>0)
{ {
url_no_qs=url_remove_qs(request->req_spec.url, desc->qs_num, desc->ignore_qs); url_no_qs=url_remove_qs(request->req_spec.url, desc->qs_num, desc->ignore_qs);
@@ -517,7 +517,7 @@ char* get_cache_key(const struct tfe_http_half * request, const struct cache_key
return cache_key; return cache_key;
} }
void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_def_large, void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_def_large,
MAAT_RULE_EX_DATA* ad, long argl, void *argp) MAAT_RULE_EX_DATA* ad, long argl, void *argp)
{ {
struct cache_handle* cache=(struct cache_handle*) argp; struct cache_handle* cache=(struct cache_handle*) argp;
@@ -536,7 +536,7 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de
return; return;
} }
struct cache_param* param=ALLOC(struct cache_param, 1); struct cache_param* param=ALLOC(struct cache_param, 1);
*param=cache->default_cache_policy; *param=cache->default_cache_policy;
param->ref_cnt=1; param->ref_cnt=1;
pthread_mutex_init(&(param->lock), NULL); pthread_mutex_init(&(param->lock), NULL);
@@ -545,7 +545,7 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de
{ {
qs=cJSON_GetObjectItem(key_desc,"ignore_qs"); qs=cJSON_GetObjectItem(key_desc,"ignore_qs");
if(qs && qs->type==cJSON_Array) if(qs && qs->type==cJSON_Array)
{ {
param->key_descr.qs_num=cJSON_GetArraySize(qs); param->key_descr.qs_num=cJSON_GetArraySize(qs);
param->key_descr.ignore_qs=ALLOC(char*, param->key_descr.qs_num); param->key_descr.ignore_qs=ALLOC(char*, param->key_descr.qs_num);
for(i=0; i<param->key_descr.qs_num; i++) for(i=0; i<param->key_descr.qs_num; i++)
@@ -556,12 +556,12 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de
strncat(param->key_descr.ignore_qs[i], item->valuestring, len); strncat(param->key_descr.ignore_qs[i], item->valuestring, len);
strncat(param->key_descr.ignore_qs[i], "=", len); strncat(param->key_descr.ignore_qs[i], "=", len);
} }
} }
item=cJSON_GetObjectItem(key_desc,"cookie"); item=cJSON_GetObjectItem(key_desc,"cookie");
if(item && item->type==cJSON_String) if(item && item->type==cJSON_String)
{ {
param->key_descr.include_cookie=tfe_strdup(item->valuestring); param->key_descr.include_cookie=tfe_strdup(item->valuestring);
} }
if(param->key_descr.qs_num>0||param->key_descr.include_cookie!=NULL) if(param->key_descr.qs_num>0||param->key_descr.include_cookie!=NULL)
{ {
@@ -570,31 +570,31 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de
} }
item=cJSON_GetObjectItem(json,"no_revalidate"); item=cJSON_GetObjectItem(json,"no_revalidate");
if(item && item->type==cJSON_Number) param->no_revalidate=item->valueint; if(item && item->type==cJSON_Number) param->no_revalidate=item->valueint;
item=cJSON_GetObjectItem(json,"cache_dyn_url"); item=cJSON_GetObjectItem(json,"cache_dyn_url");
if(item && item->type==cJSON_Number) param->cache_dyn_url=item->valueint; if(item && item->type==cJSON_Number) param->cache_dyn_url=item->valueint;
item=cJSON_GetObjectItem(json,"cache_cookied_cont"); item=cJSON_GetObjectItem(json,"cache_cookied_cont");
if(item && item->type==cJSON_Number) param->cache_cookied_cont=item->valueint; if(item && item->type==cJSON_Number) param->cache_cookied_cont=item->valueint;
item=cJSON_GetObjectItem(json,"ignore_req_nocache"); item=cJSON_GetObjectItem(json,"ignore_req_nocache");
if(item && item->type==cJSON_Number) param->ignore_req_nocache=item->valueint; if(item && item->type==cJSON_Number) param->ignore_req_nocache=item->valueint;
item=cJSON_GetObjectItem(json,"ignore_res_nocache"); item=cJSON_GetObjectItem(json,"ignore_res_nocache");
if(item && item->type==cJSON_Number) param->ignore_res_nocache=item->valueint; if(item && item->type==cJSON_Number) param->ignore_res_nocache=item->valueint;
item=cJSON_GetObjectItem(json,"force_caching"); item=cJSON_GetObjectItem(json,"force_caching");
if(item && item->type==cJSON_Number) param->force_caching=item->valueint; if(item && item->type==cJSON_Number) param->force_caching=item->valueint;
item=cJSON_GetObjectItem(json,"min_use"); item=cJSON_GetObjectItem(json,"min_use");
if(item && item->type==cJSON_Number) param->min_use=item->valueint; if(item && item->type==cJSON_Number) param->min_use=item->valueint;
item=cJSON_GetObjectItem(json,"pinning_time"); item=cJSON_GetObjectItem(json,"pinning_time");
if(item && item->type==cJSON_String) param->pinning_time_sec=time_unit_sec(item->valuestring); if(item && item->type==cJSON_String) param->pinning_time_sec=time_unit_sec(item->valuestring);
item=cJSON_GetObjectItem(json,"inactive_time"); item=cJSON_GetObjectItem(json,"inactive_time");
if(item && item->type==cJSON_String) param->inactive_time_sec=time_unit_sec(item->valuestring); if(item && item->type==cJSON_String) param->inactive_time_sec=time_unit_sec(item->valuestring);
@@ -603,10 +603,10 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de
item=cJSON_GetObjectItem(json,"max_cache_obj_size"); item=cJSON_GetObjectItem(json,"max_cache_obj_size");
if(item && item->type==cJSON_String) param->max_cache_obj_size=storage_unit_byte(item->valuestring); if(item && item->type==cJSON_String) param->max_cache_obj_size=storage_unit_byte(item->valuestring);
item=cJSON_GetObjectItem(json,"min_cache_obj_size"); item=cJSON_GetObjectItem(json,"min_cache_obj_size");
if(item && item->type==cJSON_String) param->min_cache_obj_size=storage_unit_byte(item->valuestring); if(item && item->type==cJSON_String) param->min_cache_obj_size=storage_unit_byte(item->valuestring);
cJSON_Delete(json); cJSON_Delete(json);
*ad=param; *ad=param;
return; return;
@@ -622,7 +622,7 @@ void cache_param_free(int idx, const struct Maat_rule_t* rule, const char* srv_d
pthread_mutex_lock(&(param->lock)); pthread_mutex_lock(&(param->lock));
param->ref_cnt--; param->ref_cnt--;
if(param->ref_cnt>0) if(param->ref_cnt>0)
{ {
pthread_mutex_unlock(&(param->lock)); pthread_mutex_unlock(&(param->lock));
return; return;
} }
@@ -688,11 +688,11 @@ static void cache_key_bloom_gc_cb(evutil_socket_t fd, short what, void * arg)
return; return;
} }
struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section,
struct event_base* gc_evbase, Maat_feather_t feather, void *logger) struct event_base* gc_evbase, Maat_feather_t feather, void *logger)
{ {
struct cache_handle* cache=ALLOC(struct cache_handle, 1); struct cache_handle* cache=ALLOC(struct cache_handle, 1);
int temp=0; int temp=0;
struct event* ev=NULL; struct event* ev=NULL;
char statsd_server_ip[TFE_SYMBOL_MAX]={0}; char statsd_server_ip[TFE_SYMBOL_MAX]={0};
char histogram_bins[TFE_SYMBOL_MAX]={0}; char histogram_bins[TFE_SYMBOL_MAX]={0};
@@ -702,18 +702,18 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha
cache->thread_count=tfe_proxy_get_work_thread_count(); cache->thread_count=tfe_proxy_get_work_thread_count();
cache->clients=ALLOC(struct tango_cache_instance *, cache->thread_count); cache->clients=ALLOC(struct tango_cache_instance *, cache->thread_count);
cache->cache_key_bloom=ALLOC(struct cache_bloom, cache->thread_count); cache->cache_key_bloom=ALLOC(struct cache_bloom, cache->thread_count);
struct cache_bloom* p_bloom=NULL; struct cache_bloom* p_bloom=NULL;
MESA_load_profile_int_def(profile_path, section, "cache_policy_enabled", MESA_load_profile_int_def(profile_path, section, "cache_policy_enabled",
&(cache->cache_policy_enabled), 1); &(cache->cache_policy_enabled), 1);
MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_size", MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_size",
(int*)&(cache->cache_key_bloom_size), 16*1000*1000); (int*)&(cache->cache_key_bloom_size), 16*1000*1000);
MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_life", MESA_load_profile_int_def(profile_path, section, "cache_key_bloom_life",
&(cache->cache_key_bloom_life), 30*60); &(cache->cache_key_bloom_life), 30*60);
struct timeval gc_refresh_delay = {cache->cache_key_bloom_life, 0}; struct timeval gc_refresh_delay = {cache->cache_key_bloom_life, 0};
unsigned int i=0; unsigned int i=0;
struct tango_cache_parameter *cache_client_param=tango_cache_parameter_new(profile_path, section, logger); struct tango_cache_parameter *cache_client_param=tango_cache_parameter_new(profile_path, section, logger);
for(i=0; i<cache->thread_count; i++) for(i=0; i<cache->thread_count; i++)
{ {
@@ -729,17 +729,17 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha
{ {
goto error_out; goto error_out;
} }
ev = event_new(tfe_proxy_get_work_thread_evbase(i), -1, EV_PERSIST, cache_key_bloom_gc_cb, p_bloom); ev = event_new(tfe_proxy_get_work_thread_evbase(i), -1, EV_PERSIST, cache_key_bloom_gc_cb, p_bloom);
evtimer_add(ev, &gc_refresh_delay); evtimer_add(ev, &gc_refresh_delay);
} }
cache->clients[i]=tango_cache_instance_new(cache_client_param,tfe_proxy_get_work_thread_evbase(i), logger); cache->clients[i]=tango_cache_instance_new(cache_client_param,tfe_proxy_get_work_thread_evbase(i), logger);
if(cache->clients[i]==NULL) if(cache->clients[i]==NULL)
{ {
goto error_out; goto error_out;
} }
} }
MESA_load_profile_int_def(profile_path, section, "get_concurrency_max", &temp, 1000*1000); MESA_load_profile_int_def(profile_path, section, "get_concurrency_max", &temp, 1000*1000);
cache->get_concurrency_max=temp; cache->get_concurrency_max=temp;
MESA_load_profile_int_def(profile_path, section, "put_concurrency_max", &(temp), 1000*1000); MESA_load_profile_int_def(profile_path, section, "put_concurrency_max", &(temp), 1000*1000);
@@ -749,7 +749,7 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha
MESA_load_profile_int_def(profile_path, section, "cached_undefined_obj_minimum_size", &(temp), 100*1024); MESA_load_profile_int_def(profile_path, section, "cached_undefined_obj_minimum_size", &(temp), 100*1024);
cache->cache_undefined_obj_min_size=temp; cache->cache_undefined_obj_min_size=temp;
cache->gc_evbase=gc_evbase; cache->gc_evbase=gc_evbase;
cache->default_cache_policy.key_descr.qs_num=0; cache->default_cache_policy.key_descr.qs_num=0;
cache->default_cache_policy.no_revalidate=0; cache->default_cache_policy.no_revalidate=0;
@@ -762,7 +762,7 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha
cache->default_cache_policy.inactive_time_sec=0; cache->default_cache_policy.inactive_time_sec=0;
cache->default_cache_policy.max_cache_size=0; cache->default_cache_policy.max_cache_size=0;
MESA_load_profile_int_def(profile_path, section, "min_use", &(cache->default_cache_policy.min_use), 0); MESA_load_profile_int_def(profile_path, section, "min_use", &(cache->default_cache_policy.min_use), 0);
MESA_load_profile_int_def(profile_path, section, "max_cache_obj_size", &(temp), 1024*1024*1024); MESA_load_profile_int_def(profile_path, section, "max_cache_obj_size", &(temp), 1024*1024*1024);
cache->default_cache_policy.max_cache_obj_size=temp; //<1GB by default cache->default_cache_policy.max_cache_obj_size=temp; //<1GB by default
@@ -773,8 +773,8 @@ struct cache_handle* create_web_cache_handle(const char* profile_path, const cha
{ {
cache->table_url_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_URL"); cache->table_url_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_URL");
cache->table_cookie_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_COOKIE"); cache->table_cookie_constraint=Maat_table_register(feather, "PXY_CACHE_HTTP_COOKIE");
cache->cache_param_idx=Maat_rule_get_ex_new_index(feather, "PXY_CACHE_COMPILE", cache->cache_param_idx=Maat_rule_get_ex_new_index(feather, "PXY_CACHE_COMPILE",
cache_param_new, cache_param_free, cache_param_dup, cache_param_new, cache_param_free, cache_param_dup,
0, cache); 0, cache);
cache->ref_feather=feather; cache->ref_feather=feather;
@@ -794,7 +794,7 @@ error_out:
} }
static char* read_http1_hdr(const char* hdr, const char* field_name) static char* read_http1_hdr(const char* hdr, const char* field_name)
{ {
const char *p=NULL, *q=NULL; const char *p=NULL, *q=NULL;
char* value=NULL; char* value=NULL;
p=strcasestr(hdr, field_name); p=strcasestr(hdr, field_name);
@@ -823,7 +823,7 @@ struct cache_query_context
const struct cache_mid* ref_mid; const struct cache_mid* ref_mid;
char* url; char* url;
struct cached_meta meta; struct cached_meta meta;
struct tango_cache_result* ref_tango_cache_result; struct tango_cache_result* ref_tango_cache_result;
struct future* f_tango_cache_fetch; struct future* f_tango_cache_fetch;
}; };
@@ -907,16 +907,16 @@ static void cache_query_obj_on_succ(future_result_t * result, void * user)
ctx->meta.content_length=ctx->ref_tango_cache_result->tlength; ctx->meta.content_length=ctx->ref_tango_cache_result->tlength;
TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query hit: %s", ctx->url); TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query hit: %s", ctx->url);
break; break;
case RESULT_TYPE_USERTAG: case RESULT_TYPE_USERTAG:
cached_meta_set(&ctx->meta, RESULT_TYPE_USERTAG, ctx->ref_tango_cache_result->data_frag, ctx->ref_tango_cache_result->size); cached_meta_set(&ctx->meta, RESULT_TYPE_USERTAG, ctx->ref_tango_cache_result->data_frag, ctx->ref_tango_cache_result->size);
break; break;
case RESULT_TYPE_MISS: case RESULT_TYPE_MISS:
TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query miss: %s", ctx->url); TFE_LOG_DEBUG(ctx->ref_handle->logger, "cache query miss: %s", ctx->url);
//NOT break intentionally. //NOT break intentionally.
case RESULT_TYPE_END: case RESULT_TYPE_END:
//last call. //last call.
ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_READING])); ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_READING]));
promise_dettach_ctx(p); promise_dettach_ctx(p);
last_call=1; last_call=1;
break; break;
case RESULT_TYPE_BODY: case RESULT_TYPE_BODY:
@@ -928,7 +928,7 @@ static void cache_query_obj_on_succ(future_result_t * result, void * user)
promise_success(p, ctx); promise_success(p, ctx);
if(last_call) if(last_call)
{ {
cache_query_ctx_free_cb(ctx); cache_query_ctx_free_cb(ctx);
promise_finish(p); promise_finish(p);
} }
return; return;
@@ -939,7 +939,7 @@ static void cache_query_obj_on_fail(enum e_future_error err, const char * what,
struct cache_query_context* ctx=(struct cache_query_context*)promise_dettach_ctx(p); struct cache_query_context* ctx=(struct cache_query_context*)promise_dettach_ctx(p);
promise_failed(p, err, what); promise_failed(p, err, what);
promise_finish(p); promise_finish(p);
ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_READING])); ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_READING]));
cache_query_ctx_free_cb(ctx); cache_query_ctx_free_cb(ctx);
return; return;
} }
@@ -999,7 +999,7 @@ static void cache_read_meta_on_succ(future_result_t * result, void * user)
switch(_result->type) switch(_result->type)
{ {
case RESULT_TYPE_HEADER: case RESULT_TYPE_HEADER:
ctx->cached_obj_meta.content_length=_result->tlength; ctx->cached_obj_meta.content_length=_result->tlength;
cached_meta_set(&ctx->cached_obj_meta, RESULT_TYPE_HEADER, _result->data_frag, _result->size); cached_meta_set(&ctx->cached_obj_meta, RESULT_TYPE_HEADER, _result->data_frag, _result->size);
ctx->status=PENDING_RESULT_REVALIDATE; ctx->status=PENDING_RESULT_REVALIDATE;
@@ -1018,8 +1018,8 @@ static void cache_read_meta_on_succ(future_result_t * result, void * user)
case RESULT_TYPE_END: case RESULT_TYPE_END:
//last call. //last call.
ctx->location=_result->location; ctx->location=_result->location;
ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_PENDING])); ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_PENDING]));
promise_dettach_ctx(p); promise_dettach_ctx(p);
promise_success(p, ctx); promise_success(p, ctx);
cache_pending_ctx_free_cb(ctx); cache_pending_ctx_free_cb(ctx);
break; break;
@@ -1032,8 +1032,8 @@ static void cache_read_meta_on_fail(enum e_future_error err, const char * what,
{ {
struct promise * p = (struct promise *) user; struct promise * p = (struct promise *) user;
struct cache_pending_context* ctx=(struct cache_pending_context*)promise_dettach_ctx(p); struct cache_pending_context* ctx=(struct cache_pending_context*)promise_dettach_ctx(p);
promise_failed(p, err, what); promise_failed(p, err, what);
ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_PENDING])); ATOMIC_DEC(&(ctx->ref_handle->stat_val[STAT_CACHE_PENDING]));
cache_pending_ctx_free_cb(ctx); cache_pending_ctx_free_cb(ctx);
return; return;
} }
@@ -1042,7 +1042,7 @@ static void cache_read_meta_on_fail(enum e_future_error err, const char * what,
#define CACHE_ACTION_BYPASS 0x80 #define CACHE_ACTION_BYPASS 0x80
enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, unsigned int thread_id, enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, unsigned int thread_id,
const struct tfe_http_half * request, struct cache_mid** mid, struct future* f_revalidate) const struct tfe_http_half * request, struct cache_mid** mid, struct future* f_revalidate)
{ {
enum cache_pending_result result=PENDING_RESULT_FOBIDDEN; enum cache_pending_result result=PENDING_RESULT_FOBIDDEN;
struct Maat_rule_t cache_policy; struct Maat_rule_t cache_policy;
struct cache_param* param=&(handle->default_cache_policy); struct cache_param* param=&(handle->default_cache_policy);
@@ -1056,14 +1056,14 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u
if(cookie) if(cookie)
{ {
_mid->has_cookie=1; _mid->has_cookie=1;
} }
_mid->is_dyn_url=is_dynamic_url(request->req_spec.url); _mid->is_dyn_url=is_dynamic_url(request->req_spec.url);
if(handle->cache_policy_enabled) if(handle->cache_policy_enabled)
{ {
ret=Maat_full_scan_string(handle->ref_feather, handle->table_url_constraint, CHARSET_UTF8, ret=Maat_full_scan_string(handle->ref_feather, handle->table_url_constraint, CHARSET_UTF8,
request->req_spec.url, strlen(request->req_spec.url), request->req_spec.url, strlen(request->req_spec.url),
&cache_policy, NULL, 1, &scan_mid, thread_id); &cache_policy, NULL, 1, &scan_mid, thread_id);
if(cookie && ret<=0) if(cookie && ret<=0)
{ {
@@ -1075,7 +1075,7 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u
if(ret>0) if(ret>0)
{ {
ex_data=Maat_rule_get_ex_data(handle->ref_feather, &cache_policy, handle->cache_param_idx); ex_data=Maat_rule_get_ex_data(handle->ref_feather, &cache_policy, handle->cache_param_idx);
if(ex_data!=NULL) if(ex_data!=NULL)
{ {
@@ -1093,13 +1093,13 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u
_mid->cache_key=get_cache_key(request, &(param->key_descr)); _mid->cache_key=get_cache_key(request, &(param->key_descr));
} }
TFE_LOG_DEBUG(handle->logger, "cache policy %d matched: url=%s alt-key=%s", TFE_LOG_DEBUG(handle->logger, "cache policy %d matched: url=%s alt-key=%s",
cache_policy.config_id, cache_policy.config_id,
request->req_spec.url, request->req_spec.url,
_mid->cache_key!=NULL?_mid->cache_key:"null"); _mid->cache_key!=NULL?_mid->cache_key:"null");
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_POLICY_MATCH])); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_POLICY_MATCH]));
} }
if(_mid->shall_bypass || if(_mid->shall_bypass ||
(!param->force_caching && !param->cache_dyn_url && _mid->is_dyn_url && param->key_descr.qs_num==0) || (!param->force_caching && !param->cache_dyn_url && _mid->is_dyn_url && param->key_descr.qs_num==0) ||
(!param->force_caching && !param->cache_cookied_cont && _mid->has_cookie)) (!param->force_caching && !param->cache_cookied_cont && _mid->has_cookie))
{ {
@@ -1143,7 +1143,7 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u
} }
else else
{ {
result=PENDING_RESULT_REVALIDATE; result=PENDING_RESULT_REVALIDATE;
} }
break; break;
} }
@@ -1152,22 +1152,22 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u
_mid->result=result; _mid->result=result;
return _mid->result; return _mid->result;
} }
struct tango_cache_meta_get meta; struct tango_cache_meta_get meta;
memset(&meta, 0, sizeof(meta)); memset(&meta, 0, sizeof(meta));
meta.url=_mid->cache_key!=NULL?_mid->cache_key:request->req_spec.url; meta.url=_mid->cache_key!=NULL?_mid->cache_key:request->req_spec.url;
meta.get = _mid->req_fresshness; meta.get = _mid->req_fresshness;
struct promise* p=future_to_promise(f_revalidate); struct promise* p=future_to_promise(f_revalidate);
struct cache_pending_context* ctx=ALLOC(struct cache_pending_context, 1); struct cache_pending_context* ctx=ALLOC(struct cache_pending_context, 1);
ctx->status=PENDING_RESULT_FOBIDDEN; ctx->status=PENDING_RESULT_FOBIDDEN;
ctx->ref_handle=handle; ctx->ref_handle=handle;
ctx->url=tfe_strdup(request->req_spec.url); ctx->url=tfe_strdup(request->req_spec.url);
ctx->req_if_modified_since=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_MODIFIED_SINCE)); ctx->req_if_modified_since=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_MODIFIED_SINCE));
ctx->req_if_none_match=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_NONE_MATCH)); ctx->req_if_none_match=tfe_strdup(tfe_http_std_field_read(request, TFE_HTTP_IF_NONE_MATCH));
promise_set_ctx(p, ctx, cache_pending_ctx_free_cb); promise_set_ctx(p, ctx, cache_pending_ctx_free_cb);
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_PENDING])); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_PENDING]));
ctx->f_tango_cache_fetch=future_create("_cache_pend", cache_read_meta_on_succ, cache_read_meta_on_fail, p); ctx->f_tango_cache_fetch=future_create("_cache_pend", cache_read_meta_on_succ, cache_read_meta_on_fail, p);
ret=tango_cache_head_object(handle->clients[thread_id], ctx->f_tango_cache_fetch, &meta); ret=tango_cache_head_object(handle->clients[thread_id], ctx->f_tango_cache_fetch, &meta);
if(ret<0) if(ret<0)
@@ -1178,7 +1178,7 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u
return _mid->result; return _mid->result;
} }
_mid->result=PENDING_RESULT_REVALIDATE; _mid->result=PENDING_RESULT_REVALIDATE;
return _mid->result; return _mid->result;
} }
int web_cache_async_read(struct cache_handle* handle, unsigned int thread_id, int web_cache_async_read(struct cache_handle* handle, unsigned int thread_id,
@@ -1188,10 +1188,10 @@ int web_cache_async_read(struct cache_handle* handle, unsigned int thread_id,
struct promise* p=NULL; struct promise* p=NULL;
struct cache_mid* _mid=*mid; struct cache_mid* _mid=*mid;
assert(_mid->result!=PENDING_RESULT_FOBIDDEN); assert(_mid->result!=PENDING_RESULT_FOBIDDEN);
if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_READING])) > ATOMIC_READ(&(handle->put_concurrency_max))) if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_READING])) > ATOMIC_READ(&(handle->put_concurrency_max)))
{ {
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_READ_THROTTLE])); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_READ_THROTTLE]));
return -1; return -1;
} }
@@ -1204,11 +1204,11 @@ int web_cache_async_read(struct cache_handle* handle, unsigned int thread_id,
query_ctx->ref_mid=_mid; query_ctx->ref_mid=_mid;
query_ctx->url=tfe_strdup(request->req_spec.url); query_ctx->url=tfe_strdup(request->req_spec.url);
p=future_to_promise(f); p=future_to_promise(f);
promise_allow_many_successes(p); promise_allow_many_successes(p);
promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb); promise_set_ctx(p, query_ctx, cache_query_ctx_free_cb);
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_READING])); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_READING]));
query_ctx->f_tango_cache_fetch=future_create("_cache_read", cache_query_obj_on_succ, cache_query_obj_on_fail, p); query_ctx->f_tango_cache_fetch=future_create("_cache_read", cache_query_obj_on_succ, cache_query_obj_on_fail, p);
int ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta, _mid->location); int ret=tango_cache_fetch_object(handle->clients[thread_id], query_ctx->f_tango_cache_fetch, &meta, _mid->location);
if(ret<0) if(ret<0)
@@ -1247,7 +1247,7 @@ static void wrap_cache_write_on_fail(enum e_future_error err, const char * what,
cache_write_future_ctx_free(ctx); cache_write_future_ctx_free(ctx);
} }
struct cache_write_context* web_cache_write_start(struct cache_handle* handle, unsigned int thread_id, struct cache_write_context* web_cache_write_start(struct cache_handle* handle, unsigned int thread_id,
const struct tfe_http_session * session, struct cache_mid **mid) const struct tfe_http_session * session, struct cache_mid **mid)
{ {
struct cache_write_context* write_ctx=NULL; struct cache_write_context* write_ctx=NULL;
@@ -1259,9 +1259,9 @@ struct cache_write_context* web_cache_write_start(struct cache_handle* handle, u
char *tmp=NULL; char *tmp=NULL;
int i=0, is_undefined_obj=0; int i=0, is_undefined_obj=0;
size_t content_len=0; size_t content_len=0;
const struct cache_param* param=NULL; const struct cache_param* param=NULL;
struct cache_mid* _mid=*mid; struct cache_mid* _mid=*mid;
if(_mid!=NULL && _mid->is_using_exception_param) if(_mid!=NULL && _mid->is_using_exception_param)
{ {
param=_mid->param; param=_mid->param;
@@ -1286,20 +1286,20 @@ struct cache_write_context* web_cache_write_start(struct cache_handle* handle, u
{ {
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITE_FORBIDEN])); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITE_FORBIDEN]));
TFE_LOG_DEBUG(handle->logger, "cache write forbiden: %s", session->req->req_spec.url); TFE_LOG_DEBUG(handle->logger, "cache write forbiden: %s", session->req->req_spec.url);
return NULL; return NULL;
} }
break; break;
case REVALIDATE: case REVALIDATE:
case ALLOWED: case ALLOWED:
case UNDEFINED: case UNDEFINED:
if(param->force_caching) if(param->force_caching)
{ {
break; break;
} }
else if(_mid->shall_bypass else if(_mid->shall_bypass
|| (param->max_cache_obj_size!=0 && content_len > param->max_cache_obj_size) || (param->max_cache_obj_size!=0 && content_len > param->max_cache_obj_size)
|| (param->min_cache_obj_size > content_len) || (param->min_cache_obj_size > content_len)
|| (!param->cache_cookied_cont && _mid->has_cookie) || (!param->cache_cookied_cont && _mid->has_cookie)
|| (!param->cache_html && _mid->is_html) || (!param->cache_html && _mid->is_html)
) )
{ {
@@ -1319,8 +1319,8 @@ struct cache_write_context* web_cache_write_start(struct cache_handle* handle, u
break; break;
} }
if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_WRITING])) > handle->get_concurrency_max) if(ATOMIC_READ(&(handle->stat_val[STAT_CACHE_WRITING])) > handle->get_concurrency_max)
{ {
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITE_THROTTLE])); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITE_THROTTLE]));
return NULL; return NULL;
} }
const char* key=NULL; const char* key=NULL;
@@ -1346,7 +1346,7 @@ struct cache_write_context* web_cache_write_start(struct cache_handle* handle, u
} }
} }
ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITING])); ATOMIC_INC(&(handle->stat_val[STAT_CACHE_WRITING]));
struct tango_cache_meta_put meta; struct tango_cache_meta_put meta;
memset(&meta, 0, sizeof(meta)); memset(&meta, 0, sizeof(meta));
meta.url=_mid->cache_key?_mid->cache_key:session->req->req_spec.url; meta.url=_mid->cache_key?_mid->cache_key:session->req->req_spec.url;
@@ -1413,7 +1413,7 @@ int web_cache_write_end(struct cache_write_context* ctx)
{ {
//upload too slow or storage server error; //upload too slow or storage server error;
TFE_LOG_DEBUG(ctx->ref_cache_handle->logger, "cache upload failed: %s",ctx->future_ctx->url); TFE_LOG_DEBUG(ctx->ref_cache_handle->logger, "cache upload failed: %s",ctx->future_ctx->url);
cache_write_future_ctx_free(ctx->future_ctx); cache_write_future_ctx_free(ctx->future_ctx);
ctx->future_ctx=NULL; ctx->future_ctx=NULL;
ATOMIC_INC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_WRITE_ERR])); ATOMIC_INC(&(ctx->ref_cache_handle->stat_val[STAT_CACHE_WRITE_ERR]));
ret=-1; ret=-1;