#include "tfe_rpc.h" #include "event2/http.h" #include "event2/http_struct.h" #include "event2/event.h" #include "event2/buffer.h" #include "event2/dns.h" #include "event2/thread.h" #include "tfe_utils.h" #include "MESA/MESA_handle_logger.h" #include #include #include #include #include #define MODULE_NAME "tfe_rpc" struct tfe_rpc_ctx { struct event_base * evbase; enum TFE_RPC_FLAG flag; }; /* //will be called after receiving and parsing the full header. It allows analyzing the header and possibly closing the connection by returning a value < 0. int read_header_done_cb(struct evhttp_request* response, void* arg) { //struct promise* p = (struct promise*)arg; printf("call read_header_done_cb!\n"); printf("< HTTP/1.1 %d %s\n", evhttp_request_get_response_code(response), evhttp_request_get_response_code_line(response)); return 0; struct evkeyvalq* headers = evhttp_request_get_input_headers(response); struct evkeyval* header; TAILQ_FOREACH(header, headers, next){ MESA_handle_runtime_log(logger, RLOG_LV_INFO, MODULE_NAME, "< %s: %s\n", header->key, header->value); } MESA_handle_runtime_log(logger, RLOG_LV_INFO, MODULE_NAME, "< \n"); } */ static void tfe_rpc_promise_free_ctx(void* ctx) { free(ctx); ctx = NULL; return; } static void _wrapped_promise_success(struct promise* p, void* result) { struct tfe_rpc_ctx* ctx = (struct tfe_rpc_ctx*)promise_get_ctx(p); struct tfe_rpc_response_result* _result = (struct tfe_rpc_response_result*)result; if(ctx->flag == CHUNK_CB && _result->len > 0) { return; } if(ctx->evbase) { event_base_loopexit(ctx->evbase, 0); } //promise_dettach_ctx(p); //tfe_rpc_promise_free_ctx(ctx); promise_success(p, result); return; } static void _wrapped_promise_failed(struct promise * p, enum e_future_error error, const char * what) { struct tfe_rpc_ctx* ctx = (struct tfe_rpc_ctx*)promise_get_ctx(p); if(ctx->evbase) { event_base_loopexit(ctx->evbase, 0); } promise_failed(p, error, what); //promise_dettach_ctx(p); //ctx_destroy_cb(ctx); return; } //will be called after every read of data with the same argument as the completion callback. Will never be called on an empty response. May drain the input buffer; it will be drained automatically on return. //will drain automaticly void read_chunk_cb(struct evhttp_request* response, void* arg) { struct promise* p = (struct promise*)arg; //printf("call get_chunk_cb\n"); if(response == NULL) { return; } struct evbuffer* evbuf = evhttp_request_get_input_buffer(response); if(evbuf == NULL) { return; } size_t evbuf_len = evbuffer_get_length(evbuf); char* data = (char*)evbuffer_pullup(evbuf, evbuf_len); //printf("data is %s\n", data==NULL ? "NULL":"NOT NULL"); struct tfe_rpc_response_result* result = ALLOC(struct tfe_rpc_response_result, 1); result->status_code = evhttp_request_get_response_code(response); result->status_msg = evhttp_request_get_response_code_line(response); result->data = data; result->len = evbuf_len; _wrapped_promise_success(p, result); free(result); } //The callback is executed when the request completed or an error occurred void get_response_cb(struct evhttp_request* response, void* arg) { //printf("call get_response_cb\n"); read_chunk_cb(response, arg); } //On error, both the error callback and the regular callback will be called, error callback is called before the regular callback. void request_error_cb(enum evhttp_request_error error, void* arg) { //printf("call request_error_cb\n"); struct promise* p = (struct promise*)arg; switch(error) { case EVREQ_HTTP_TIMEOUT: _wrapped_promise_failed(p, FUTURE_ERROR_TIMEOUT, "EVREQ_HTTP_TIMEOUT"); break; case EVREQ_HTTP_EOF: _wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_EOF"); break; case EVREQ_HTTP_INVALID_HEADER: _wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_INVALID_HEADER"); break; case EVREQ_HTTP_BUFFER_ERROR: _wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_BUFFER_ERROR"); break; case EVREQ_HTTP_REQUEST_CANCEL: _wrapped_promise_failed(p, FUTURE_ERROR_CANCEL, "EVREQ_HTTP_REQUEST_CANCEL"); break; case EVREQ_HTTP_DATA_TOO_LONG: _wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_DATA_TOO_LONG"); break; default: _wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_UNKOWN_EXCEPTION"); break; } } //when to close a connection ??? //Set a callback for connection close void connection_close_cb(struct evhttp_connection* connection, void* arg) { //printf("call connection_close_cb\n"); } //data is for POST. if method is GET, data should be NULL void tfe_rpc_async_ask(struct future* f, const char* url, enum TFE_RPC_METHOD method, enum TFE_RPC_FLAG flag, const char* data, int data_len, struct event_base * evbase) { struct promise* p = future_to_promise(f); struct tfe_rpc_ctx* ctx = ALLOC(struct tfe_rpc_ctx, 1); ctx->evbase = evbase; ctx->flag = flag; promise_set_ctx(p, (void*)ctx, tfe_rpc_promise_free_ctx); if(!evbase) { _wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "event base is NULL"); return; } struct evhttp_uri* uri = evhttp_uri_parse(url); if(NULL == uri) { _wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "parse url failed!"); return; } const char* host = evhttp_uri_get_host(uri); if(!host) { _wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "parse host failed!"); return; } int port = evhttp_uri_get_port(uri); if(port < 0) { port = 80; } const char* path = evhttp_uri_get_path(uri); const char* query = evhttp_uri_get_query(uri); char request_url[TFE_STRING_MAX] = ""; if(path == NULL || strnlen(path, TFE_STRING_MAX) == 0) { snprintf(request_url, TFE_STRING_MAX, "/"); } else { snprintf(request_url, TFE_STRING_MAX, "%s", path); } if(query && strnlen(query, TFE_STRING_MAX)) { strncat(request_url, "?", TFE_STRING_MAX); strncat(request_url, query, TFE_STRING_MAX); } //printf("url:%s host:%s port:%d path:%s query:%s request_url:%s\n", url, host, port, path, query, request_url); struct evdns_base* dnsbase = evdns_base_new(evbase, EVDNS_BASE_INITIALIZE_NAMESERVERS); if (!dnsbase) { _wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "create dns base failed!"); return; } struct evhttp_connection* connection = evhttp_connection_base_new(evbase, dnsbase, host, port); if (!connection) { _wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "create connection failed!"); return; } evhttp_connection_set_closecb(connection, connection_close_cb, evbase); struct evhttp_request* request = evhttp_request_new(get_response_cb, (void*)p); //evhttp_request_set_header_cb(request, read_header_done_cb); if(flag == CHUNK_CB) { evhttp_request_set_chunked_cb(request, read_chunk_cb); } evhttp_request_set_error_cb(request, request_error_cb); evhttp_add_header(evhttp_request_get_output_headers(request), "Host", host); switch(method) { case GET: evhttp_make_request(connection, request, EVHTTP_REQ_GET, request_url); break; case POST: evbuffer_add(request->output_buffer, data, data_len); evhttp_make_request(connection, request, EVHTTP_REQ_POST, request_url); break; default: _wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "method is invalid!"); return; } } struct tfe_rpc_response_result* tfe_rpc_release(void* result) { struct tfe_rpc_response_result* response = (struct tfe_rpc_response_result*)result; return response; }