Merge branch 'develop-tfe3a' of git.mesalab.cn:tango/tfe into develop-tfe3a

This commit is contained in:
zhengchao
2018-11-20 19:43:14 +08:00
7 changed files with 66 additions and 39 deletions

View File

@@ -430,7 +430,7 @@ static inline struct tfe_http_half * tfe_http_session_response_create(struct tfe
static inline int tfe_http_in_request(enum tfe_http_event events) static inline int tfe_http_in_request(enum tfe_http_event events)
{ {
if ((events & EV_HTTP_REQ_HDR) | (events & EV_HTTP_REQ_BODY_BEGIN) | (events & EV_HTTP_REQ_BODY_END) if ((events & EV_HTTP_REQ_HDR) | (events & EV_HTTP_REQ_BODY_BEGIN) | (events & EV_HTTP_REQ_BODY_END)
| (events & EV_HTTP_REQ_BODY_CONT)) | (events & EV_HTTP_REQ_BODY_CONT)| (events & EV_HTTP_REQ_END))
{ {
return 1; return 1;
} }

View File

@@ -10,7 +10,7 @@
#include <MESA/field_stat2.h> #include <MESA/field_stat2.h>
const char* FP_HISTOGRAM_BINS="10,50,100,500"; const char* FP_HISTOGRAM_BINS="0.5,0.8,0.9,0.95";
struct future_promise_instance struct future_promise_instance
{ {
@@ -110,7 +110,7 @@ static long field_get_set_cb(void * data, const uchar * key, uint size, void * u
{ {
field_id=(int*)malloc(sizeof(int)*2); field_id=(int*)malloc(sizeof(int)*2);
snprintf(buff,sizeof(buff),"%s(ms)",(char*)key); snprintf(buff,sizeof(buff),"%s(ms)",(char*)key);
field_id[0]=FS_register(args->fs_handle, FS_STYLE_HISTOGRAM, FS_CALC_CURRENT, buff); field_id[0]=FS_register_histogram(args->fs_handle, FS_CALC_SPEED, buff, 1, 5*1000, 2);//1ms~5s
args->fsid_latency=field_id[0]; args->fsid_latency=field_id[0];
snprintf(buff,sizeof(buff),"%s%s",(char*)key,fail_str); snprintf(buff,sizeof(buff),"%s%s",(char*)key,fail_str);
field_id[1]=FS_register(args->fs_handle, FS_STYLE_FIELD, FS_CALC_SPEED,buff); field_id[1]=FS_register(args->fs_handle, FS_STYLE_FIELD, FS_CALC_SPEED,buff);

View File

@@ -208,6 +208,11 @@ struct tfe_stream_write_ctx * tfe_stream_write_frag_start(const struct tfe_strea
int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size) int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned char * data, size_t size)
{ {
if(w_ctx->_stream == NULL)
{
return -EPIPE;
}
struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir); struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir);
int ret = 0; int ret = 0;
if (this_conn != NULL) if (this_conn != NULL)
@@ -224,8 +229,13 @@ int tfe_stream_write_frag(struct tfe_stream_write_ctx * w_ctx, const unsigned ch
void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx) void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx)
{ {
struct tfe_conn_private * this_conn = __this_conn(w_ctx->_stream, w_ctx->dir); struct tfe_conn_private * this_conn;
struct tfe_conn_private * peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir); struct tfe_conn_private * peer_conn;
/* The connection terminated before this function call */
if (w_ctx->_stream == NULL) goto __out;
this_conn = __this_conn(w_ctx->_stream, w_ctx->dir);
peer_conn = __peer_conn(w_ctx->_stream, w_ctx->dir);
if (this_conn != NULL) if (this_conn != NULL)
{ {
@@ -233,20 +243,18 @@ void tfe_stream_write_frag_end(struct tfe_stream_write_ctx * w_ctx)
bufferevent_enable(peer_conn->bev, EV_READ); bufferevent_enable(peer_conn->bev, EV_READ);
} }
if (w_ctx->_stream != NULL) if (w_ctx->dir == CONN_DIR_DOWNSTREAM)
{ {
if (w_ctx->dir == CONN_DIR_DOWNSTREAM) assert(w_ctx->_stream->w_ctx_downstream == w_ctx);
{ w_ctx->_stream->w_ctx_downstream = NULL;
assert(w_ctx->_stream->w_ctx_downstream == w_ctx); }
w_ctx->_stream->w_ctx_downstream = NULL; else
} {
else assert(w_ctx->_stream->w_ctx_upstream == w_ctx);
{ w_ctx->_stream->w_ctx_upstream = NULL;
assert(w_ctx->_stream->w_ctx_upstream == w_ctx);
w_ctx->_stream->w_ctx_upstream = NULL;
}
} }
__out:
free(w_ctx); free(w_ctx);
} }
@@ -616,6 +624,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
struct tfe_conn_private ** ref_peer_conn{}; struct tfe_conn_private ** ref_peer_conn{};
struct ssl_stream ** ref_this_ssl_stream{}; struct ssl_stream ** ref_this_ssl_stream{};
struct ssl_stream ** ref_peer_ssl_stream{}; struct ssl_stream ** ref_peer_ssl_stream{};
struct tfe_stream_write_ctx ** ref_this_write_ctx{};
const char * __str_dir = NULL; const char * __str_dir = NULL;
if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM) if (__bev_dir(_stream, bev) == CONN_DIR_UPSTREAM)
@@ -624,6 +633,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
ref_peer_conn = &_stream->conn_downstream; ref_peer_conn = &_stream->conn_downstream;
ref_this_ssl_stream = &_stream->ssl_upstream; ref_this_ssl_stream = &_stream->ssl_upstream;
ref_peer_ssl_stream = &_stream->ssl_downstream; ref_peer_ssl_stream = &_stream->ssl_downstream;
ref_this_write_ctx = &_stream->w_ctx_upstream;
} }
if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM) if (__bev_dir(_stream, bev) == CONN_DIR_DOWNSTREAM)
@@ -632,6 +642,7 @@ static void __stream_bev_eventcb(struct bufferevent * bev, short events, void *
ref_peer_conn = &_stream->conn_upstream; ref_peer_conn = &_stream->conn_upstream;
ref_this_ssl_stream = &_stream->ssl_downstream; ref_this_ssl_stream = &_stream->ssl_downstream;
ref_peer_ssl_stream = &_stream->ssl_upstream; ref_peer_ssl_stream = &_stream->ssl_upstream;
ref_this_write_ctx = &_stream->w_ctx_downstream;
} }
if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF) if (events & BEV_EVENT_ERROR || events & BEV_EVENT_EOF)
@@ -665,6 +676,12 @@ __close_connection:
if (*ref_this_conn != NULL) if (*ref_this_conn != NULL)
{ {
/* There is a frag writter setted, need to clear the reference of stream in the writter to indicate the connection is closed */
if(*ref_this_write_ctx != NULL)
{
(*ref_this_write_ctx)->_stream = NULL;
}
__conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream); __conn_private_destory_with_ssl(ev_base, *ref_this_conn, *ref_this_ssl_stream);
*ref_this_conn = NULL; *ref_this_conn = NULL;
*ref_this_ssl_stream = NULL; *ref_this_ssl_stream = NULL;

View File

@@ -439,7 +439,6 @@ struct pangu_http_ctx
size_t datalen, unsigned int thread_id, struct pangu_http_ctx* ctx); size_t datalen, unsigned int thread_id, struct pangu_http_ctx* ctx);
enum cache_pending_result pending_result; enum cache_pending_result pending_result;
enum cache_query_status cache_query_status;
struct future *f_cache_pending, *f_cache_query; struct future *f_cache_pending, *f_cache_query;
struct tfe_http_session * ref_session; struct tfe_http_session * ref_session;
struct tfe_http_half* cache_revalidate_req; struct tfe_http_half* cache_revalidate_req;
@@ -1084,16 +1083,16 @@ static void cache_query_on_succ(future_result_t * result, void * user)
break; break;
case CACHE_QUERY_RESULT_END: case CACHE_QUERY_RESULT_END:
assert(ctx->cached_response!=NULL); assert(ctx->cached_response!=NULL);
ctx->cache_query_status=WEB_CACHE_HIT;
tfe_http_half_write_body_end(ctx->cached_response); tfe_http_half_write_body_end(ctx->cached_response);
//ownership has been transferred to http session, set to NULL. //ownership has been transferred to http session, set to NULL.
ctx->pending_result=PENDING_RESULT_HIT;
ctx->cached_response=NULL; ctx->cached_response=NULL;
assert(ctx->cache_result_actual_sz==ctx->cache_result_declared_sz); assert(ctx->cache_result_actual_sz==ctx->cache_result_declared_sz);
future_destroy(ctx->f_cache_query); future_destroy(ctx->f_cache_query);
ctx->f_cache_query=NULL; ctx->f_cache_query=NULL;
break; break;
case CACHE_QUERY_RESULT_MISS: case CACHE_QUERY_RESULT_MISS:
ctx->cache_query_status=WEB_CACHE_MISS; ctx->pending_result=PENDING_RESULT_MISS;
ctx->resumed_cb=dummy_resume; ctx->resumed_cb=dummy_resume;
tfe_http_session_resume(ctx->ref_session); tfe_http_session_resume(ctx->ref_session);
future_destroy(ctx->f_cache_query); future_destroy(ctx->f_cache_query);
@@ -1109,7 +1108,6 @@ static void cache_query_on_fail(enum e_future_error err, const char * what, void
struct pangu_http_ctx * ctx = (struct pangu_http_ctx *)user; struct pangu_http_ctx * ctx = (struct pangu_http_ctx *)user;
future_destroy(ctx->f_cache_query); future_destroy(ctx->f_cache_query);
ctx->f_cache_query=NULL; ctx->f_cache_query=NULL;
ctx->cache_query_status=WEB_CACHE_MISS;
if(!ctx->cached_response) if(!ctx->cached_response)
{ {
tfe_http_session_resume(ctx->ref_session); tfe_http_session_resume(ctx->ref_session);
@@ -1120,7 +1118,7 @@ static void cache_query_on_fail(enum e_future_error err, const char * what, void
tfe_http_half_write_body_end(ctx->cached_response); tfe_http_half_write_body_end(ctx->cached_response);
ctx->cached_response=NULL; ctx->cached_response=NULL;
} }
ctx->pending_result=PENDING_RESULT_MISS;
printf("cache query failed: %s %s\n", ctx->ref_session->req->req_spec.url, what); printf("cache query failed: %s %s\n", ctx->ref_session->req->req_spec.url, what);
} }
static void cache_pending_on_succ(future_result_t * result, void * user) static void cache_pending_on_succ(future_result_t * result, void * user)
@@ -1345,7 +1343,7 @@ void pangu_on_http_data(const struct tfe_stream * stream, const struct tfe_http_
} }
} }
if(tfe_http_in_response(events)&& ctx->pending_result==PENDING_RESULT_MISS) if(tfe_http_in_response(events) && ctx->pending_result==PENDING_RESULT_MISS)
{ {
cache_update(session, events, body_frag, frag_size, thread_id, ctx); cache_update(session, events, body_frag, frag_size, thread_id, ctx);
} }

View File

@@ -190,7 +190,7 @@ void cache_stat_init(struct cache_handle* cache)
{ {
const char* fieldstat_output="./cache.fieldstat"; const char* fieldstat_output="./cache.fieldstat";
const char* app_name="tango_cache"; const char* app_name="tango_cache";
const char* obj_size_bins_KB="10,100,1000,10000"; const char* obj_size_bins_KB="0.5,0.8,0.9,0.95";
int value=0, i=0; int value=0, i=0;
screen_stat_handle_t fs_handle=NULL; screen_stat_handle_t fs_handle=NULL;
@@ -238,7 +238,14 @@ void cache_stat_init(struct cache_handle* cache)
for(i=0;i<__CACHE_STAT_MAX;i++) for(i=0;i<__CACHE_STAT_MAX;i++)
{ {
cache->fs_id[i]=FS_register(cache->fs_handle, spec[i].style, spec[i].calc_type, spec[i].name); if(spec[i].style==FS_STYLE_HISTOGRAM)
{
cache->fs_id[i]=FS_register_histogram(cache->fs_handle, spec[i].calc_type, spec[i].name,1,10*1024*1024,2);
}
else
{
cache->fs_id[i]=FS_register(cache->fs_handle, spec[i].style, spec[i].calc_type, spec[i].name);
}
} }
// value=cache->fs_id[STAT_CACHE_QUERY_HIT]; // value=cache->fs_id[STAT_CACHE_QUERY_HIT];
// FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value)); // FS_set_para(cache->fs_handle, ID_INVISBLE, &value, sizeof(value));
@@ -492,7 +499,6 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de
key_desc=cJSON_GetObjectItem(json,"cache_key"); key_desc=cJSON_GetObjectItem(json,"cache_key");
if(key_desc && key_desc->type==cJSON_Object) if(key_desc && key_desc->type==cJSON_Object)
{ {
param->key_descr.is_not_empty=1;
qs=cJSON_GetObjectItem(key_desc,"ignore_qs"); qs=cJSON_GetObjectItem(key_desc,"ignore_qs");
if(qs && qs->type==cJSON_Array) if(qs && qs->type==cJSON_Array)
{ {
@@ -508,8 +514,15 @@ void cache_param_new(int idx, const struct Maat_rule_t* rule, const char* srv_de
} }
} }
item=cJSON_GetObjectItem(key_desc,"cookie"); item=cJSON_GetObjectItem(key_desc,"cookie");
if(item && item->type==cJSON_String) param->key_descr.include_cookie=tfe_strdup(item->valuestring); if(item && item->type==cJSON_String)
{
param->key_descr.include_cookie=tfe_strdup(item->valuestring);
}
if(param->key_descr.qs_num>0||param->key_descr.include_cookie!=NULL)
{
param->key_descr.is_not_empty=1;
}
} }
@@ -1003,7 +1016,7 @@ enum cache_pending_result web_cache_async_pending(struct cache_handle* handle, u
{ {
_mid->cache_key=get_cache_key(request, &(param->key_descr)); _mid->cache_key=get_cache_key(request, &(param->key_descr));
} }
TFE_LOG_DEBUG(handle->logger, "cache policy %d matched: url=%s alternative key=%s", TFE_LOG_DEBUG(handle->logger, "cache policy %d matched: url=%s alt-key=%s",
cache_policy.config_id, cache_policy.config_id,
request->req_spec.url, request->req_spec.url,
_mid->cache_key!=NULL?_mid->cache_key:"null"); _mid->cache_key!=NULL?_mid->cache_key:"null");
@@ -1199,7 +1212,7 @@ struct cache_update_context* web_cache_update_start(struct cache_handle* handle,
case ALLOWED: case ALLOWED:
case UNDEFINED: case UNDEFINED:
if(_mid->shall_bypass if(_mid->shall_bypass
|| content_len > param->max_cache_obj_size || (param->max_cache_obj_size!=0 && content_len > param->max_cache_obj_size)
|| (!param->cache_cookied_cont && _mid->has_cookie) || (!param->cache_cookied_cont && _mid->has_cookie)
|| (!param->cache_html && _mid->is_html)) || (!param->cache_html && _mid->is_html))
{ {

View File

@@ -4,15 +4,7 @@
#include <tfe_future.h> #include <tfe_future.h>
#include <MESA/Maat_rule.h> #include <MESA/Maat_rule.h>
enum cache_query_status
{
WEB_CACHE_BEFORE_QUERY=0,
WEB_CACHE_NOT_APPLICABLE,
WEB_CACHE_NEED_VERIFY,
WEB_CACHE_QUERY_DATA,
WEB_CACHE_MISS,
WEB_CACHE_HIT
};
struct cache_handle; struct cache_handle;
struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section, struct cache_handle* create_web_cache_handle(const char* profile_path, const char* section,
struct event_base* gc_evbase, Maat_feather_t feather, void *logger); struct event_base* gc_evbase, Maat_feather_t feather, void *logger);
@@ -49,6 +41,7 @@ enum cache_pending_result
PENDING_RESULT_FOBIDDEN, PENDING_RESULT_FOBIDDEN,
PENDING_RESULT_REVALIDATE, PENDING_RESULT_REVALIDATE,
PENDING_RESULT_ALLOWED, PENDING_RESULT_ALLOWED,
PENDING_RESULT_HIT,
PENDING_RESULT_MISS PENDING_RESULT_MISS
}; };
struct cache_mid; struct cache_mid;

View File

@@ -392,6 +392,12 @@ enum tfe_stream_action http_connection_entry(const struct tfe_stream * stream, e
hs_private->release_lock--; hs_private->release_lock--;
hs_private->suspend_counter++; hs_private->suspend_counter++;
/* Retrieve the user's stream action, because the user may set request/response at resume's callback */
if(hf_private_in->is_user_stream_action_set)
{
hf_private_in->stream_action = hf_private_in->user_stream_action;
}
/* Call user callback, tell user we resume from suspend */ /* Call user callback, tell user we resume from suspend */
assert(hs_private->resume_tag_singal); assert(hs_private->resume_tag_singal);
hs_private->resume_tag_singal = false; hs_private->resume_tag_singal = false;