This repository has been archived on 2025-09-14. You can view files and clone it, but cannot push or open issues or pull requests.
Files
tango-tfe/common/src/tfe_rpc.cpp
2018-11-29 16:24:45 +08:00

264 lines
7.8 KiB
C++

#include "tfe_rpc.h"
#include "event2/http.h"
#include "event2/http_struct.h"
#include "event2/event.h"
#include "event2/buffer.h"
#include "event2/dns.h"
#include "event2/thread.h"
#include "tfe_utils.h"
#include "MESA/MESA_handle_logger.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <sys/queue.h>
#define MODULE_NAME "tfe_rpc"
struct tfe_rpc_ctx
{
struct event_base * evbase;
enum TFE_RPC_FLAG flag;
struct evhttp_connection* connection;
};
/*
//will be called after receiving and parsing the full header. It allows analyzing the header and possibly closing the connection by returning a value < 0.
int read_header_done_cb(struct evhttp_request* response, void* arg)
{
//struct promise* p = (struct promise*)arg;
printf("call read_header_done_cb!\n");
printf("< HTTP/1.1 %d %s\n", evhttp_request_get_response_code(response), evhttp_request_get_response_code_line(response));
return 0;
struct evkeyvalq* headers = evhttp_request_get_input_headers(response);
struct evkeyval* header;
TAILQ_FOREACH(header, headers, next){
MESA_handle_runtime_log(logger, RLOG_LV_INFO, MODULE_NAME, "< %s: %s\n", header->key, header->value);
}
MESA_handle_runtime_log(logger, RLOG_LV_INFO, MODULE_NAME, "< \n");
}
*/
static void tfe_rpc_promise_free_ctx(void* ctx)
{
struct tfe_rpc_ctx* _ctx=(struct tfe_rpc_ctx*)ctx;
evhttp_connection_free(_ctx->connection);
_ctx->connection=NULL;
free(ctx);
ctx = NULL;
return;
}
static void _wrapped_promise_success(struct promise* p, void* result)
{
struct tfe_rpc_ctx* ctx = (struct tfe_rpc_ctx*)promise_get_ctx(p);
struct tfe_rpc_response_result* _result = (struct tfe_rpc_response_result*)result;
if(ctx->flag == CHUNK_CB && _result->len > 0)
{
return;
}
/*
if(ctx->evbase)
{
event_base_loopexit(ctx->evbase, 0);
}
*/
//promise_dettach_ctx(p);
//tfe_rpc_promise_free_ctx(ctx);
promise_success(p, result);
return;
}
static void _wrapped_promise_failed(struct promise * p, enum e_future_error error, const char * what)
{
/*
struct tfe_rpc_ctx* ctx = (struct tfe_rpc_ctx*)promise_get_ctx(p);
if(ctx->evbase)
{
event_base_loopexit(ctx->evbase, 0);
}
*/
promise_failed(p, error, what);
//promise_dettach_ctx(p);
//ctx_destroy_cb(ctx);
return;
}
//will be called after every read of data with the same argument as the completion callback. Will never be called on an empty response. May drain the input buffer; it will be drained automatically on return.
//will drain automaticly
void read_chunk_cb(struct evhttp_request* response, void* arg)
{
struct promise* p = (struct promise*)arg;
//printf("call get_chunk_cb\n");
if(response == NULL)
{
return;
}
struct evbuffer* evbuf = evhttp_request_get_input_buffer(response);
if(evbuf == NULL)
{
return;
}
size_t evbuf_len = evbuffer_get_length(evbuf);
char* data = (char*)evbuffer_pullup(evbuf, evbuf_len);
//printf("data is %s\n", data==NULL ? "NULL":"NOT NULL");
struct tfe_rpc_response_result* result = ALLOC(struct tfe_rpc_response_result, 1);
result->status_code = evhttp_request_get_response_code(response);
result->status_msg = evhttp_request_get_response_code_line(response);
result->data = data;
result->len = evbuf_len;
_wrapped_promise_success(p, result);
free(result);
}
//The callback is executed when the request completed or an error occurred
void get_response_cb(struct evhttp_request* response, void* arg)
{
//printf("call get_response_cb\n");
read_chunk_cb(response, arg);
}
//On error, both the error callback and the regular callback will be called, error callback is called before the regular callback.
void request_error_cb(enum evhttp_request_error error, void* arg)
{
//printf("call request_error_cb\n");
struct promise* p = (struct promise*)arg;
switch(error)
{
case EVREQ_HTTP_TIMEOUT:
_wrapped_promise_failed(p, FUTURE_ERROR_TIMEOUT, "EVREQ_HTTP_TIMEOUT");
break;
case EVREQ_HTTP_EOF:
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_EOF");
break;
case EVREQ_HTTP_INVALID_HEADER:
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_INVALID_HEADER");
break;
case EVREQ_HTTP_BUFFER_ERROR:
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_BUFFER_ERROR");
break;
case EVREQ_HTTP_REQUEST_CANCEL:
_wrapped_promise_failed(p, FUTURE_ERROR_CANCEL, "EVREQ_HTTP_REQUEST_CANCEL");
break;
case EVREQ_HTTP_DATA_TOO_LONG:
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_DATA_TOO_LONG");
break;
default:
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "EVREQ_HTTP_UNKOWN_EXCEPTION");
break;
}
}
//when to close a connection ???
//Set a callback for connection close
void connection_close_cb(struct evhttp_connection* connection, void* arg)
{
//printf("call connection_close_cb\n");
}
char* get_request_url(struct evhttp_uri* uri, int url_len)
{
const char* path = evhttp_uri_get_path(uri);
const char* query = evhttp_uri_get_query(uri);
char *request_url = NULL;
request_url = (char*)malloc(url_len);
if(path == NULL || strnlen(path, url_len) == 0)
{
snprintf(request_url, url_len, "/");
}
else
{
snprintf(request_url, url_len, "%s", path);
}
if(query && strnlen(query, url_len))
{
strncat(request_url, "?", url_len);
strncat(request_url, query, url_len);
}
return request_url;
}
//data is for POST. if method is GET, data should be NULL
void tfe_rpc_async_ask(struct future* f, const char* url, enum TFE_RPC_METHOD method, enum TFE_RPC_FLAG flag, const char* data, int data_len, struct event_base * evbase, struct evdns_base* dnsbase)
{
struct promise* p = future_to_promise(f);
struct tfe_rpc_ctx* ctx = ALLOC(struct tfe_rpc_ctx, 1);
ctx->evbase = evbase;
ctx->flag = flag;
promise_set_ctx(p, (void*)ctx, tfe_rpc_promise_free_ctx);
assert(evbase&&dnsbase);
struct evhttp_uri* uri = evhttp_uri_parse(url);
if(NULL == uri)
{
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "parse url failed!");
goto error_out;
}
const char* host = evhttp_uri_get_host(uri);
if(!host)
{
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "parse host failed!");
goto error_out;
}
int port = evhttp_uri_get_port(uri);
if(port < 0)
{
port = 80;
}
ctx->connection = evhttp_connection_base_new(evbase, dnsbase, host, port);
if (!ctx->connection)
{
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "create connection failed!");
goto error_out;
}
evhttp_connection_set_closecb(ctx->connection, connection_close_cb, evbase);
struct evhttp_request* request = evhttp_request_new(get_response_cb, (void*)p);
//evhttp_request_set_header_cb(request, read_header_done_cb);
if(flag == CHUNK_CB)
{
evhttp_request_set_chunked_cb(request, read_chunk_cb);
}
evhttp_request_set_error_cb(request, request_error_cb);
evhttp_add_header(evhttp_request_get_output_headers(request), "Host", host);
int url_len = strlen(url);
char* request_url = get_request_url(uri, url_len);
//printf("request url is %s\n", request_url);
if(request_url == NULL)
{
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "get request url failed");
goto error_out;
}
switch(method)
{
case GET:
evhttp_make_request(ctx->connection, request, EVHTTP_REQ_GET, request_url);
break;
case POST:
evbuffer_add(request->output_buffer, data, data_len);
evhttp_make_request(ctx->connection, request, EVHTTP_REQ_POST, request_url);
break;
default:
_wrapped_promise_failed(p, FUTURE_ERROR_EXCEPTION, "method is invalid!");
goto error_out;
}
free(request_url);
evhttp_uri_free(uri);
return;
error_out:
if(uri) evhttp_uri_free(uri);
promise_dettach_ctx(p);
tfe_rpc_promise_free_ctx(ctx);
return;
}
struct tfe_rpc_response_result* tfe_rpc_release(void* result)
{
struct tfe_rpc_response_result* response = (struct tfe_rpc_response_result*)result;
return response;
}