业务层适配HTTP流式写消息体的接口.

This commit is contained in:
zhengchao
2018-10-16 16:51:15 +08:00
parent 95c57fe8a5
commit f567fba70b
4 changed files with 55 additions and 65 deletions

View File

@@ -253,10 +253,8 @@ struct pangu_http_ctx
enum cache_query_status cache_query_status;
struct future* f_cache_query;
struct tfe_http_session * ref_session;
struct cached_meta* cached_header;
struct evbuffer* cached_body;
struct tfe_http_half* cached_response;
size_t cache_result_declared_sz, cache_result_actual_sz;
struct cache_update_context* cache_update_ctx;
int thread_id;
@@ -307,16 +305,8 @@ static void pangu_http_ctx_free(struct pangu_http_ctx * ctx)
web_cache_update_end(ctx->cache_update_ctx);
ctx->cache_update_ctx=NULL;
}
if(ctx->cached_header)
{
cache_query_free_meta(ctx->cached_header);
ctx->cached_header=NULL;
}
if(ctx->cached_body)
{
evbuffer_free(ctx->cached_body);
ctx->cached_body=NULL;
}
assert(ctx->cached_response==NULL);
if(ctx->f_cache_query)
{
future_destroy(ctx->f_cache_query);
@@ -447,36 +437,57 @@ static void cache_query_on_succ(future_result_t * result, void * user)
struct pangu_http_ctx * ctx = (struct pangu_http_ctx *)user;
struct cached_meta* meta=NULL;
enum cache_query_result_type type=cache_query_result_get_type(result);
const unsigned char* data=NULL;
size_t data_sz=0;
char temp[TFE_STRING_MAX];
switch(type)
{
case CACHE_QUERY_RESULT_META:
meta=cache_query_result_get_header(result);
ctx->cache_result_declared_sz=meta->content_length;
ctx->resume_from_cache_query=1;
tfe_http_session_resume(ctx->ref_session);
ctx->cached_response=tfe_http_session_response_create(ctx->ref_session, 200);
tfe_http_nonstd_field_write(ctx->cached_response, "X-Cache-Lookup", "Hit From MESA-TFE");
tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_TYPE, meta->content_type);
snprintf(temp, sizeof(temp), "%lu", meta->content_length);
tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_LENGTH, temp);
tfe_http_session_response_set(ctx->ref_session, ctx->cached_response);
tfe_http_half_write_body_begin(ctx->cached_response, 1);
cache_query_free_meta(meta);
meta=NULL;
break;
case CACHE_QUERY_RESULT_DATA:
data_sz=cache_query_result_get_data(result, &data);
tfe_http_half_write_body_data(ctx->cached_response, data, data_sz);
ctx->cache_result_actual_sz+=data_sz;
break;
case CACHE_QUERY_RESULT_END:
assert(ctx->cached_response!=NULL);
ctx->cache_query_status=WEB_CACHE_HIT;
tfe_http_half_write_body_end(ctx->cached_response);
printf("cache query hit: %s\n", ctx->ref_session->req->req_spec.url);
//ownership has been transferred to http session, set to NULL.
ctx->cached_response=NULL;
assert(ctx->cache_result_actual_sz==ctx->cache_result_declared_sz);
future_destroy(ctx->f_cache_query);
ctx->f_cache_query=NULL;
break;
case CACHE_QUERY_RESULT_MISS:
ctx->cache_query_status=WEB_CACHE_NOT_HIT;
printf("cache query miss: %s\n", ctx->ref_session->req->req_spec.url);
goto __cleanup;
break;
case CACHE_QUERY_RESULT_END:
assert(ctx->cached_body!=NULL);
ctx->cache_query_status=WEB_CACHE_HIT;
printf("cache query hit: %s\n", ctx->ref_session->req->req_spec.url);
goto __cleanup;
break;
case CACHE_QUERY_RESULT_META:
ctx->cached_header=cache_query_result_get_header(result);
ctx->cached_body=evbuffer_new();
break;
case CACHE_QUERY_RESULT_DATA:
cache_query_result_append_data(ctx->cached_body, result);;
break;
default:
assert(0);
}
return;
__cleanup:
ctx->resume_from_cache_query=1;
tfe_http_session_resume(ctx->ref_session);
future_destroy(ctx->f_cache_query);
ctx->f_cache_query=NULL;
tfe_http_session_resume(ctx->ref_session);
ctx->resume_from_cache_query=1;
break;
default:
break;
}
return;
}
static void cache_query_on_fail(enum e_future_error err, const char * what, void * user)
@@ -882,26 +893,6 @@ void cache_update(const struct tfe_http_session * session, enum tfe_http_event e
}
}
void cache_make_response(const struct tfe_http_session * session, struct pangu_http_ctx * ctx)
{
size_t cont_len=0;
struct tfe_http_session * wr_session=tfe_http_session_allow_write(session);
struct tfe_http_half* cached_response=tfe_http_session_response_create(wr_session, 200);
tfe_http_nonstd_field_write(cached_response, "X-Cache-Lookup", "Hit From MESA-TFE");
tfe_http_std_field_write(cached_response, TFE_HTTP_CONT_TYPE, ctx->cached_header->content_type);
char temp[TFE_STRING_MAX];
snprintf(temp, sizeof(temp), "%lu", ctx->cached_header->content_length);
tfe_http_std_field_write(cached_response, TFE_HTTP_CONT_LENGTH, temp);
assert(ctx->cached_header->content_length==evbuffer_get_length(ctx->cached_body));
tfe_http_half_append_body(cached_response, (char*)evbuffer_pullup(ctx->cached_body, -1), evbuffer_get_length(ctx->cached_body), 0);
tfe_http_half_append_body(cached_response, NULL, 0, 0);
tfe_http_session_response_set(wr_session, cached_response);
cache_query_free_meta(ctx->cached_header);
ctx->cached_header=NULL;
evbuffer_free(ctx->cached_body);
ctx->cached_body=NULL;
}
void pangu_on_http_begin(const struct tfe_stream * stream,
const struct tfe_http_session * session, unsigned int thread_id, void ** pme)
@@ -973,7 +964,6 @@ void pangu_on_http_data(const struct tfe_stream * stream, const struct tfe_http_
{
//resume from cache query.
assert(ctx->action==PG_ACTION_NONE);
cache_make_response(session, ctx);
tfe_http_session_detach(session);
return;
}

View File

@@ -92,12 +92,12 @@ struct cached_meta* cache_query_result_get_header(future_result_t * result)
meta->content_type=read_http1_hdr((const char*)cache_result->data_frag, "content-type");
return meta;
}
void cache_query_result_append_data(struct evbuffer* buf, future_result_t * result)
size_t cache_query_result_get_data(future_result_t * result, const unsigned char** pp_data)
{
struct tango_cache_result* cache_result=tango_cache_read_result(result);
assert(cache_result->type==RESULT_TYPE_BODY);
evbuffer_add(buf, cache_result->data_frag, cache_result->size);
return;
*pp_data=(const unsigned char*)cache_result->data_frag;
return cache_result->size;
}
enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsigned int thread_id,

View File

@@ -20,7 +20,7 @@ struct cached_meta
};
struct cached_meta* cache_query_result_get_header(future_result_t * result);
void cache_query_free_meta(struct cached_meta* meta);
void cache_query_result_append_data(struct evbuffer* buf, future_result_t * result);
size_t cache_query_result_get_data(future_result_t * result, const unsigned char** pp_data);
enum cache_query_status async_web_cache_query(struct cache_handle* handle, unsigned int thread_id,
const struct tfe_http_half * request, struct future* f);

View File

@@ -79,12 +79,12 @@ static int __write_http_half_to_line(const struct tfe_stream * stream,
* and has been call body_begin, construct the header */
if (hf_private->evbuf_body != NULL && hf_private->write_ctx == NULL)
{
ret = __write_http_half(hf_private, stream, dir);
if (unlikely(ret < 0)) return ret;
/* Alloc stream write ctx */
hf_private->write_ctx = tfe_stream_write_frag_start(stream, dir);
if (unlikely(hf_private->write_ctx == NULL)) return -1;
ret = __write_http_half(hf_private, stream, dir);
if (unlikely(ret < 0)) return ret;
}
}
else