2018-09-07 17:42:57 +08:00
# include "tfe_rpc.h"
# include "event2/http.h"
# include "event2/http_struct.h"
# include "event2/event.h"
# include "event2/buffer.h"
# include "event2/dns.h"
# include "event2/thread.h"
# include "tfe_utils.h"
# include "MESA/MESA_handle_logger.h"
# include <stdio.h>
# include <stdlib.h>
# include <string.h>
# include <assert.h>
# include <sys/queue.h>
# define MODULE_NAME "tfe_rpc"
struct tfe_rpc_ctx
{
struct event_base * evbase ;
enum TFE_RPC_FLAG flag ;
} ;
/*
//will be called after receiving and parsing the full header. It allows analyzing the header and possibly closing the connection by returning a value < 0.
int read_header_done_cb ( struct evhttp_request * response , void * arg )
{
//struct promise* p = (struct promise*)arg;
printf ( " call read_header_done_cb! \n " ) ;
printf ( " < HTTP/1.1 %d %s \n " , evhttp_request_get_response_code ( response ) , evhttp_request_get_response_code_line ( response ) ) ;
return 0 ;
struct evkeyvalq * headers = evhttp_request_get_input_headers ( response ) ;
struct evkeyval * header ;
TAILQ_FOREACH ( header , headers , next ) {
MESA_handle_runtime_log ( logger , RLOG_LV_INFO , MODULE_NAME , " < %s: %s \n " , header - > key , header - > value ) ;
}
MESA_handle_runtime_log ( logger , RLOG_LV_INFO , MODULE_NAME , " < \n " ) ;
}
*/
static void tfe_rpc_promise_free_ctx ( void * ctx )
{
free ( ctx ) ;
ctx = NULL ;
return ;
}
static void _wrapped_promise_success ( struct promise * p , void * result )
{
struct tfe_rpc_ctx * ctx = ( struct tfe_rpc_ctx * ) promise_get_ctx ( p ) ;
struct tfe_rpc_response_result * _result = ( struct tfe_rpc_response_result * ) result ;
if ( ctx - > flag = = CHUNK_CB & & _result - > len > 0 )
{
return ;
}
2018-09-19 17:06:42 +08:00
/*
2018-09-07 17:42:57 +08:00
if ( ctx - > evbase )
{
event_base_loopexit ( ctx - > evbase , 0 ) ;
}
2018-09-19 17:06:42 +08:00
*/
2018-09-07 17:42:57 +08:00
//promise_dettach_ctx(p);
//tfe_rpc_promise_free_ctx(ctx);
promise_success ( p , result ) ;
return ;
}
static void _wrapped_promise_failed ( struct promise * p , enum e_future_error error , const char * what )
{
2018-09-19 17:06:42 +08:00
/*
2018-09-07 17:42:57 +08:00
struct tfe_rpc_ctx * ctx = ( struct tfe_rpc_ctx * ) promise_get_ctx ( p ) ;
if ( ctx - > evbase )
{
event_base_loopexit ( ctx - > evbase , 0 ) ;
}
2018-09-19 17:06:42 +08:00
*/
2018-09-07 17:42:57 +08:00
promise_failed ( p , error , what ) ;
//promise_dettach_ctx(p);
//ctx_destroy_cb(ctx);
return ;
}
//will be called after every read of data with the same argument as the completion callback. Will never be called on an empty response. May drain the input buffer; it will be drained automatically on return.
//will drain automaticly
void read_chunk_cb ( struct evhttp_request * response , void * arg )
{
struct promise * p = ( struct promise * ) arg ;
//printf("call get_chunk_cb\n");
if ( response = = NULL )
{
return ;
}
struct evbuffer * evbuf = evhttp_request_get_input_buffer ( response ) ;
if ( evbuf = = NULL )
{
return ;
}
2018-09-13 20:25:12 +08:00
size_t evbuf_len = evbuffer_get_length ( evbuf ) ;
2018-09-07 17:42:57 +08:00
char * data = ( char * ) evbuffer_pullup ( evbuf , evbuf_len ) ;
//printf("data is %s\n", data==NULL ? "NULL":"NOT NULL");
struct tfe_rpc_response_result * result = ALLOC ( struct tfe_rpc_response_result , 1 ) ;
result - > status_code = evhttp_request_get_response_code ( response ) ;
result - > status_msg = evhttp_request_get_response_code_line ( response ) ;
result - > data = data ;
result - > len = evbuf_len ;
_wrapped_promise_success ( p , result ) ;
free ( result ) ;
}
//The callback is executed when the request completed or an error occurred
void get_response_cb ( struct evhttp_request * response , void * arg )
{
//printf("call get_response_cb\n");
read_chunk_cb ( response , arg ) ;
}
//On error, both the error callback and the regular callback will be called, error callback is called before the regular callback.
void request_error_cb ( enum evhttp_request_error error , void * arg )
{
//printf("call request_error_cb\n");
struct promise * p = ( struct promise * ) arg ;
switch ( error )
{
case EVREQ_HTTP_TIMEOUT :
_wrapped_promise_failed ( p , FUTURE_ERROR_TIMEOUT , " EVREQ_HTTP_TIMEOUT " ) ;
break ;
case EVREQ_HTTP_EOF :
_wrapped_promise_failed ( p , FUTURE_ERROR_EXCEPTION , " EVREQ_HTTP_EOF " ) ;
break ;
case EVREQ_HTTP_INVALID_HEADER :
_wrapped_promise_failed ( p , FUTURE_ERROR_EXCEPTION , " EVREQ_HTTP_INVALID_HEADER " ) ;
break ;
case EVREQ_HTTP_BUFFER_ERROR :
_wrapped_promise_failed ( p , FUTURE_ERROR_EXCEPTION , " EVREQ_HTTP_BUFFER_ERROR " ) ;
break ;
case EVREQ_HTTP_REQUEST_CANCEL :
_wrapped_promise_failed ( p , FUTURE_ERROR_CANCEL , " EVREQ_HTTP_REQUEST_CANCEL " ) ;
break ;
case EVREQ_HTTP_DATA_TOO_LONG :
_wrapped_promise_failed ( p , FUTURE_ERROR_EXCEPTION , " EVREQ_HTTP_DATA_TOO_LONG " ) ;
break ;
default :
_wrapped_promise_failed ( p , FUTURE_ERROR_EXCEPTION , " EVREQ_HTTP_UNKOWN_EXCEPTION " ) ;
break ;
}
}
//when to close a connection ???
//Set a callback for connection close
void connection_close_cb ( struct evhttp_connection * connection , void * arg )
{
//printf("call connection_close_cb\n");
}
2018-09-19 17:06:42 +08:00
char * get_request_url ( struct evhttp_uri * uri , int url_len )
{
const char * path = evhttp_uri_get_path ( uri ) ;
const char * query = evhttp_uri_get_query ( uri ) ;
char * request_url = NULL ;
request_url = ( char * ) malloc ( url_len ) ;
if ( path = = NULL | | strnlen ( path , url_len ) = = 0 )
{
snprintf ( request_url , url_len , " / " ) ;
}
else
{
snprintf ( request_url , url_len , " %s " , path ) ;
}
if ( query & & strnlen ( query , url_len ) )
{
strncat ( request_url , " ? " , url_len ) ;
strncat ( request_url , query , url_len ) ;
}
return request_url ;
}
2018-09-07 17:42:57 +08:00
//data is for POST. if method is GET, data should be NULL
2018-11-26 14:54:20 +08:00
void tfe_rpc_async_ask ( struct future * f , const char * url , enum TFE_RPC_METHOD method , enum TFE_RPC_FLAG flag , const char * data , int data_len , struct event_base * evbase , struct evdns_base * dnsbase )
2018-09-07 17:42:57 +08:00
{
struct promise * p = future_to_promise ( f ) ;
struct tfe_rpc_ctx * ctx = ALLOC ( struct tfe_rpc_ctx , 1 ) ;
ctx - > evbase = evbase ;
ctx - > flag = flag ;
promise_set_ctx ( p , ( void * ) ctx , tfe_rpc_promise_free_ctx ) ;
2018-11-26 14:54:20 +08:00
assert ( evbase & & dnsbase ) ;
2018-09-07 17:42:57 +08:00
struct evhttp_uri * uri = evhttp_uri_parse ( url ) ;
if ( NULL = = uri )
{
_wrapped_promise_failed ( p , FUTURE_ERROR_EXCEPTION , " parse url failed! " ) ;
return ;
}
const char * host = evhttp_uri_get_host ( uri ) ;
if ( ! host )
{
_wrapped_promise_failed ( p , FUTURE_ERROR_EXCEPTION , " parse host failed! " ) ;
return ;
}
int port = evhttp_uri_get_port ( uri ) ;
if ( port < 0 )
{
port = 80 ;
}
2018-11-26 14:54:20 +08:00
2018-09-07 17:42:57 +08:00
struct evhttp_connection * connection = evhttp_connection_base_new ( evbase , dnsbase , host , port ) ;
if ( ! connection )
{
_wrapped_promise_failed ( p , FUTURE_ERROR_EXCEPTION , " create connection failed! " ) ;
return ;
}
evhttp_connection_set_closecb ( connection , connection_close_cb , evbase ) ;
struct evhttp_request * request = evhttp_request_new ( get_response_cb , ( void * ) p ) ;
//evhttp_request_set_header_cb(request, read_header_done_cb);
if ( flag = = CHUNK_CB )
{
evhttp_request_set_chunked_cb ( request , read_chunk_cb ) ;
}
evhttp_request_set_error_cb ( request , request_error_cb ) ;
evhttp_add_header ( evhttp_request_get_output_headers ( request ) , " Host " , host ) ;
2018-09-19 17:06:42 +08:00
int url_len = strlen ( url ) ;
char * request_url = get_request_url ( uri , url_len ) ;
//printf("request url is %s\n", request_url);
if ( request_url = = NULL )
{
_wrapped_promise_failed ( p , FUTURE_ERROR_EXCEPTION , " get request url failed " ) ;
return ;
}
2018-09-07 17:42:57 +08:00
switch ( method )
{
case GET :
evhttp_make_request ( connection , request , EVHTTP_REQ_GET , request_url ) ;
break ;
case POST :
evbuffer_add ( request - > output_buffer , data , data_len ) ;
evhttp_make_request ( connection , request , EVHTTP_REQ_POST , request_url ) ;
break ;
default :
_wrapped_promise_failed ( p , FUTURE_ERROR_EXCEPTION , " method is invalid! " ) ;
return ;
}
2018-09-19 17:06:42 +08:00
free ( request_url ) ;
2018-09-07 17:42:57 +08:00
}
struct tfe_rpc_response_result * tfe_rpc_release ( void * result )
{
struct tfe_rpc_response_result * response = ( struct tfe_rpc_response_result * ) result ;
return response ;
}