初步确认future-promise的异步调用机制。目录和cmake待整理。

This commit is contained in:
zhengchao
2018-08-14 19:44:34 +08:00
parent 900dc5c7ba
commit c43a2e86a3
65 changed files with 18580 additions and 90 deletions

View File

@@ -1,37 +0,0 @@
enum e_future_error
{
FUTURE_ERROR_CANCEL
FUTURE_ERROR_EXCEPTION,
FUTURE_ERROR_TIMEOUT
}
typedef void (*future_cb_success_t)(void * result, void * user);
typedef void (*future_cb_failed_t)(enum e_future_error err, const char * what, void * user);
struct future
{
void * user;
future_cb_success_t * cb_success;
future_cb_failed_t * cb_failed;
};
struct promise
{
struct future f;
void * ctx;
}
typedef void * promise_t;
void future_cancel(struct future * future)
{
future->cancel();
}
void promise_success(struct promisc *, void * result);
void promise_fail();
struct future * XXXXXRpc()
{
cancel = ___XXXXCancel()
};

20
interface/future.h Normal file
View File

@@ -0,0 +1,20 @@
enum e_future_error
{
FUTURE_ERROR_CANCEL,
FUTURE_ERROR_EXCEPTION,
FUTURE_ERROR_TIMEOUT
};
struct promise;
struct future;
typedef void (*future_success_cb)(void * result, void * user);
typedef void (*future_failed_cb)(enum e_future_error err, const char * what, void * user);
typedef void (*promise_ctx_destroy_cb)(struct promise* p);
struct future* future_create(future_success_cb * cb_success, future_failed_cb * cb_failed, void * user);
struct future* promise_to_future(struct promise* p);
struct promise* future_to_promise(struct future* f);

421
interface/http1.h Normal file
View File

@@ -0,0 +1,421 @@
#ifndef TFE_HTTP_H
#define TFE_HTTP_H
#include <vector>
#include <memory>
#include <map>
#include <string>
#include <list>
#include "util.h"
#include "easylogging++.h"
class HttpConnection;
class HttpSession;
class HttpRequest;
class HttpResponse;
class Http
{
public:
/* 回调函数调用 */
using connection_cb_t = std::function<void(Http & ht, HttpConnection & ct)>;
/* 回调函数设置 */
void SetHttpConnectionNewCallback(connection_cb_t cb)
{ connection_new_cb_ = cb; }
void SetHttpConnectionCloseCallback(connection_cb_t cb)
{ connection_close_cb_ = cb; }
/* 回调函数调用 */
void TriggerConnectionNew(HttpConnection & ct)
{ return connection_new_cb_(*this, ct); }
void TriggerConnectionClose(HttpConnection & ct)
{ return connection_close_cb_(*this, ct); }
static std::unique_ptr<Http> Factory(TfeConfigParser & cfg);
private:
connection_cb_t connection_new_cb_;
connection_cb_t connection_close_cb_;
/* 压缩降级选项 */
std::vector<std::string> accept_encoding_strip_list_;
};
class HttpHeaders
{
public:
HttpHeaders() = default;
virtual ~HttpHeaders() = default;
using str_field_t = std::string;
using str_value_t = std::string;
virtual void Add(const str_field_t & str_field, const str_value_t & str_value) = 0;
virtual void Set(const str_field_t & str_field, const str_value_t & str_value) = 0;
virtual void Remove(const str_field_t & str_field) = 0;
using for_each_cb_t = std::function<bool(const str_field_t &, const str_value_t &)>;
virtual bool ForEachHeader(for_each_cb_t cb) const = 0;
virtual bool ForEachValueOfHeader(const str_field_t & str_field, for_each_cb_t cb) const = 0;
};
class HttpRequest
{
public:
HttpRequest() = default;
virtual ~HttpRequest() = default;
/* URL读取、设置接口 */
virtual const std::string & Url() const = 0;
virtual void Url(const std::string & url) = 0;
virtual const std::string & Uri() const = 0;
virtual void Uri(const std::string & url) = 0;
/* HttpHeaders */
virtual HttpHeaders & Headers() = 0;
virtual const HttpHeaders & cHeaders() const = 0;
/* Request Body */
using body_content_t = std::vector<char>;
using body_content_ptr_t = std::unique_ptr<body_content_t>;
/* Body读取、设置接口 */
virtual const body_content_t * Body() const = 0;
virtual void Body(body_content_ptr_t body) = 0;
/* Body的Stolen接口 */
virtual body_content_ptr_t StolenBody() = 0;
/* Bypass标记本请求为直通
* 当请求标记为直通时,转发数据,不再调用业务处理函数 */
virtual bool Bypass() = 0;
virtual void Bypass(bool is_bypass) = 0;
/* ReadOnly标记本请求为只读。
* 当一个请求为只读请求时业务不应修改它的内容底层处理Readonly的请求时应直接转发不缓存 */
virtual bool ReadOnly() = 0;
virtual void ReadOnly(bool is_readonly) = 0;
/* Forward标记本请求应被转发到对端
* 当请求标记为不转发时,该请求被丢弃 */
virtual bool Forward() = 0;
virtual void Forward(bool is_forward) = 0;
/* 完整标记,该请求是否已经完整可用 */
enum section_t
{
kSectionHeader, kSectionBody, kSecionMessage
};
virtual bool Complete(section_t section) = 0;
/* HTTP版本 */
using version_t = std::tuple<short, short>;
virtual version_t Version() = 0;
/* 构建接口根据结构化数据构建HTTP请求头部 */
virtual void Construct() = 0;
/* 调试接口 */
virtual std::string DumpToString() = 0;
};
class HttpResponse
{
public:
enum section_t
{
kSectionHeader,
kSectionBody,
kSectionMessage,
kSectionMax
};
enum section_state_t
{
kStateBegin,
kStateReading,
kStateComplete,
kStateStream,
kStateCalled,
kStateStolen
};
public:
HttpResponse() = default;
virtual ~HttpResponse() = default;
/* 响应码 */
virtual int ResponseCode() = 0;
virtual void ResponseCode(int cde) = 0;
/* HttpHeaders */
virtual HttpHeaders & Headers() = 0;
virtual const HttpHeaders & cHeaders() const = 0;
/* Request Body */
using body_content_t = std::vector<char>;
using body_content_ptr_t = std::unique_ptr<body_content_t>;
/* Body读取、设置接口 */
virtual const std::vector<const body_content_t *> Body() const = 0;
virtual void Body(std::vector<body_content_ptr_t> body) = 0;
virtual std::vector<body_content_ptr_t> StolenBody() = 0;
/* ReadOnly标记本请求为只读。
* 当一个请求为只读请求时业务不应修改它的内容底层处理Readonly的请求时应直接转发不缓存 */
virtual bool ReadOnly() = 0;
virtual void ReadOnly(bool is_readonly) = 0;
/* Forward标记本请求应被转发到对端
* 当请求标记为不转发时,该请求被丢弃 */
virtual bool Forward() = 0;
virtual void Forward(bool is_forward) = 0;
/* Bypass标记本应答为直通
* 当应答标记为直通时,转发数据,不再调用业务处理函数 */
virtual bool Bypass() = 0;
virtual void Bypass(bool is_bypass) = 0;
virtual section_state_t SectionState(section_t section) = 0;
/* 构建指令根据Object构建对应的Memory */
virtual void Construct() = 0;
/* 调试接口 */
virtual std::string DumpToString() = 0;
};
class HttpConnection
{
public:
HttpConnection() = default;
virtual ~HttpConnection() = default;
using http_connection_cb_t = std::function<void(HttpSession &)>;
/* 回调函数设置 */
virtual void SetSessionNewCallback(http_connection_cb_t cb)
{ session_new_cb_ = cb; }
virtual void SetSessionCloseCallback(http_connection_cb_t cb)
{ session_close_cb_ = cb; }
/* 四元组信息获取 */
virtual const struct sockaddr * SockAddrSource() const = 0;
virtual const struct sockaddr * SockAddrDest() const = 0;
virtual void Write(std::unique_ptr<HttpSession> http_session) = 0;
virtual void Close() = 0;
protected:
http_connection_cb_t session_new_cb_{nullptr};
http_connection_cb_t session_close_cb_{nullptr};
};
class HttpSession
{
public:
explicit HttpSession(HttpConnection & connection) : http_connection_(connection) {}
virtual ~HttpSession() { __dump_session(); }
using http_session_cb_t = std::function<void(HttpSession &)>;
HttpRequest & request() const
{ return *request_; }
void request(std::unique_ptr<HttpRequest> req)
{ request_ = std::move(req); }
HttpResponse & response() const
{ return *response_; }
void response(std::unique_ptr<HttpResponse> rsp)
{ response_ = std::move(rsp); }
HttpConnection & connection() const
{ return http_connection_; }
virtual void SetRequestHeadAerCallback(http_session_cb_t cb)
{ request_header_cb_ = cb; }
virtual void SetRequestBodyCallback(http_session_cb_t cb)
{ request_body_cb_ = cb; }
virtual void SetResponseHeaderCallback(http_session_cb_t cb)
{ response_header_cb_ = cb; }
virtual void SetResponseBodyCallback(http_session_cb_t cb)
{ response_body_cb_ = cb; }
virtual void CallRequestHeaderCallback()
{ __call_session_callback(request_header_cb_, tag_request_header_cb_); }
virtual void CallRequestBodyCallback()
{ __call_session_callback(request_body_cb_, tag_request_body_cb_); }
virtual void CallResponseHeaderCallback()
{ __call_session_callback(response_header_cb_, tag_response_header_cb_); }
virtual void CallResponseBodyCallback()
{ __call_session_callback(response_body_cb_, tag_response_body_cb_); }
enum CallbackTag
{
kCallbackTagIgnore,
kCallbackTagNormal,
kCallBackTagOnlyOnce,
kCallbackTagRepeat
};
virtual void SetRequestHeaderTag(enum CallbackTag tag)
{ tag_request_header_cb_ = tag; }
virtual void SetRequestBodyTag(enum CallbackTag tag)
{ tag_request_body_cb_ = tag; }
virtual void SetResponseHeaderTag(enum CallbackTag tag)
{ tag_response_header_cb_ = tag; }
virtual void SetResponseBodyTag(enum CallbackTag tag)
{ tag_response_body_cb_ = tag; }
/* 丢弃这一Session不转发 */
virtual void Drop()
{
/* Disable all callbacks */
SetRequestHeaderTag(kCallbackTagIgnore);
SetRequestBodyTag(kCallbackTagIgnore);
SetResponseHeaderTag(kCallbackTagIgnore);
SetResponseBodyTag(kCallbackTagIgnore);
/* Tag, please drop this session */
need_to_drop_ = true;
}
/* 直通不再处理这一Session中的任何内容 */
virtual void Bypass()
{
/* Disable all callbacks */
SetRequestHeaderTag(kCallbackTagIgnore);
SetRequestBodyTag(kCallbackTagIgnore);
SetResponseHeaderTag(kCallbackTagIgnore);
SetResponseBodyTag(kCallbackTagIgnore);
need_to_drop_ = false;
}
virtual bool NeedToDrop()
{
return need_to_drop_;
}
virtual bool NeedToBypass()
{
return (tag_request_header_cb_ == kCallbackTagIgnore &&
tag_request_body_cb_ == kCallbackTagIgnore &&
tag_response_header_cb_ == kCallbackTagIgnore &&
tag_response_body_cb_ == kCallbackTagIgnore) && (!need_to_drop_);
}
protected:
HttpConnection & http_connection_;
std::unique_ptr<HttpRequest> request_{nullptr};
std::unique_ptr<HttpResponse> response_{nullptr};
std::shared_ptr<void> context_{nullptr};
/* Session Callbacks */
http_session_cb_t request_header_cb_{nullptr};
http_session_cb_t request_body_cb_{nullptr};
http_session_cb_t response_header_cb_{nullptr};
http_session_cb_t response_body_cb_{nullptr};
/* Call tag */
enum CallbackTag tag_request_header_cb_{kCallBackTagOnlyOnce};
enum CallbackTag tag_request_body_cb_{kCallbackTagNormal};
enum CallbackTag tag_response_header_cb_{kCallBackTagOnlyOnce};
enum CallbackTag tag_response_body_cb_{kCallbackTagNormal};
/* Drop tag */
bool need_to_drop_{false};
private:
void __call_session_callback(const http_session_cb_t & cb, enum CallbackTag & cb_tag)
{
while (cb_tag != kCallbackTagIgnore)
{
cb(*this);
if (cb_tag == kCallbackTagNormal) break;
if (cb_tag == kCallBackTagOnlyOnce) cb_tag = kCallbackTagIgnore;
if (cb_tag == kCallbackTagRepeat) cb_tag = kCallBackTagOnlyOnce;
}
}
void __dump_session()
{
auto str_src_addr = sockaddr_to_string(http_connection_.SockAddrSource());
auto str_dst_addr = sockaddr_to_string(http_connection_.SockAddrDest());
auto str_request = request_->DumpToString();
auto str_response = response_->DumpToString();
std::string status{};
if (NeedToDrop()) status += "DROP";
if (NeedToBypass()) status += "BYPASS";
CLOG(DEBUG, "HttpSessionTrace") << str_src_addr << str_dst_addr
<< str_request << str_response << status;
}
};
std::unique_ptr<HttpRequest> HttpRequestFactory(int primary_version, int second_version);
std::unique_ptr<HttpResponse> HttpResponseFactory(short major_version, short minor_version);
#include "pxyconn.h"
class Http1Connection : public HttpConnection
{
public:
Http1Connection(struct bufferevent * bev_downstream, struct bufferevent * bev_upstream,
const struct sockaddr_storage & source, const struct sockaddr_storage & dest)
: bev_downstream_(bev_downstream), bev_upstream_(bev_upstream), sockaddr_source_(source), sockaddr_dest_(dest)
{}
~Http1Connection() = default;
void Close() override
{ need_to_close_ = true; };
bool NeedToClose()
{ return need_to_close_; }
int on_connection_read_request(pxy_conn_ctx_t * conn_ctx, pxy_conn_desc_t * conn_this,
pxy_conn_desc_t * conn_other);
int on_connection_read_response(pxy_conn_ctx_t * conn_ctx, pxy_conn_desc_t * conn_this,
pxy_conn_desc_t * conn_other);
int on_connection_close(pxy_conn_ctx_t * conn_ctx, struct bufferevent * bev);
void Write(std::unique_ptr<HttpSession> http_session) override;
const sockaddr * SockAddrSource() const override;
const sockaddr * SockAddrDest() const override;
private:
enum direction
{
kDirectionRequest,
kDirectionResponse
};
using http_sessions_t = std::list<std::unique_ptr<HttpSession>>;
http_sessions_t http_sessions_{};
HttpSession * create_new_session();
HttpSession * last_uncomplete_session(enum direction dir);
void drop_last_session();
void drop_first_session();
bool need_to_close_{false};
/* connection info */
struct sockaddr_storage sockaddr_source_;
struct sockaddr_storage sockaddr_dest_;
/* upstream bev */
struct bufferevent * bev_upstream_;
struct bufferevent * bev_downstream_;
};
#endif //TFE_HTTP_H

View File

@@ -1,52 +0,0 @@
#define TFE_STRING_MAX 2048
#define TFE_SYMBOL_MAX 64
struct tfe_conn_ctx
{
sockaddr_t dst;
struct pxy_conn_desc src;
struct pxy_conn_desc dst;
int cur_dir//1: c2s; 2:s2c
char* sni;
};
struct tfe_conn_inner
{
int a;
struct tfe_conn_ctx conn_desc;
void **proto_arg;
};
//Return 1 for identify as its ttraffic;
//Return 0 for unknown traffic;
typedef int proto_pend_cb_t(const struct tfe_conn_ctx* c, struct evbuffer *data, void **pme);
enum tfe_proto_action
{
PROTO_ATCION_FORWARD,
PROTO_ACTION_DEFER,
PROTO_ACTION_STEAL,
PROTO_ACTION_PASSTHROUGH,
PROTO_ACTION_CLOSE
};
typedef tfe_proto_action proto_read_cb_t(const struct tfe_conn_ctx* ctx, struct evbuffer *data, void **pme);
typedef void proto_close_cb_t(const struct tfe_conn_ctx* ctx, int ev, void **pme);
//typedef int proto_onwrite_cb_t(struct tfe_conn_ctx*, struct evbuffer *data, void **pme);
struct tfe_proto_module
{
char symbol[TFE_SYMBOL_MAX];
proto_pend_cb_t *on_pend;
proto_read_cb_t *on_read;
proto_close_cb_t *on_close;
// proto_onwrite_cb_t *onwrite;
};
int tfe_io_write(struct pxy_conn_desc* dest,int dir,struct evbuffer *data);
int tfe_xxx_proto_init(struct tfe_proto_module*m);

View File

@@ -138,7 +138,7 @@ enum tfe_bussiness_action
//@param event: bit AND of EV_HTTP_**
//@param body_frag: NULL for no body data.
typedef tfe_bussiness_action (*http_read_func)(const struct tfe_conn_ctx* ctx, const struct tfe_http_session* session, uint64_t event,struct evbuffer *body_frag, void **pme);
typedef tfe_bussiness_action (*http_read_func)(const struct tfe_stream* stream, const struct tfe_http_session* session, uint64_t event,struct evbuffer *body_frag, void **pme);
struct tfe_http_half *tfe_http_request_create(int major_version,int method, const char* uri, const char* host);
struct tfe_http_half *tfe_http_response_create(int major_version,int resp_code, struct evbuff* body);

10
interface/tfe_stat.cpp Normal file
View File

@@ -0,0 +1,10 @@
void* tfe_stat_init(void* fs_handle)
{
}
void tfe_stat_flush(void* handle)
{
}

23
interface/tfe_stat.h Normal file
View File

@@ -0,0 +1,23 @@
#include "tfe_types.h"
enum TFE_STAT_FIELD
{
STREAM_NUM=0,
STREAM_OPEN,
STREAM_CLOSE,
STREAM_ERROR,
SSL_NUM,
SSL_OPEN,
SSL_CLOSE,
SSL_ERROR,
SNI_PEAK_FAIL,
IN_BYTES,
OUT_BYTES,
TFE_STAT_MAX
};
struct tfe_stats
{
long long value[TFE_STAT_MAX];
void * fs_handle;
int fs_ids[TFE_STAT_MAX];
};

86
interface/tfe_stream.h Normal file
View File

@@ -0,0 +1,86 @@
#define TFE_STRING_MAX 2048
#define TFE_SYMBOL_MAX 64
enum tfe_session_proto
{
SESSION_PROTO_PLAIN=0,
SESSION_PROTO_SSL,
SESSION_PROTO_QUIC,
SESSION_PROTO_SPDY
};
enum tfe_app_proto
{
APP_PROTO_HTTP1,
APP_PROTO_HTTP2,
APP_PROTO_WS, //websocket
APP_PROTO_QUIC //QUIC is a protocol that cross session layer and application layer.
};
enum tfe_conn_dir
{
CONN_DIR_DOWNSTREAM=0, //From client to proxy, aka client-side.
CONN_DIR_UPSTREAM //From proxy to server, aka server-side.
};
enum tfe_conn_status
{
CONN_STATUS_NONE,
CONN_STATUS_ESTABLISHED,
CONN_STATUS_CLOSED,
};
/* single dst or src socket bufferevent descriptor */
struct tfe_conn
{
struct layer_addr addr;
enum tfe_conn_status status;
struct bufferevent *bev;
} ;
struct tfe_stream
{
struct tfe_conn upstream;
struct tfe_conn downstream;
void* application_pme;
};
//Return 1 for identify as its traffic;
//Return 0 for unknown traffic;
typedef int proto_pend_cb_t(const struct tfe_stream* stream, struct evbuffer *data, void **pme);
enum tfe_proto_action
{
PROTO_ATCION_FORWARD,
PROTO_ACTION_DEFER,
PROTO_ACTION_STEAL,
PROTO_ACTION_PASSTHROUGH,
PROTO_ACTION_CLOSE
};
enum tfe_proto_action_opt
{
ACTION_OPT_DEFER_TIME_TV, //value is "struct timeval " which defines in <time.h>, default: time defer is not enabled
ACTION_OPT_DEFER_BYTES, //value is size_t, default: defer entire evbufer
ACTION_OPT_FOWARD_BYTES, //value is size_t, default: forward entire evbufer
ACTION_OPT_CLOSE_DIR //value is enum tfe_conn_dir, default: close both side.
};
int tfe_proto_action_set_opt(const struct tfe_stream* stream,enum tfe_proto_action_opt type, void* value, size_t size);
typedef tfe_proto_action proto_read_cb_t(const struct tfe_stream* stream, struct evbuffer *data, enum tfe_conn_dir dir, void **pme);
typedef void proto_close_cb_t(const struct tfe_stream* stream, int ev, void **pme);
//typedef int proto_onwrite_cb_t(struct tfe_stream*, struct evbuffer *data, void **pme);
struct tfe_proto_module
{
char symbol[TFE_SYMBOL_MAX];
proto_pend_cb_t *on_pend;
proto_read_cb_t *on_read;
proto_close_cb_t *on_close;
// proto_onwrite_cb_t *onwrite;
};
int tfe_io_write(struct pxy_conn_desc* dest,int dir,struct evbuffer *data);
int tfe_xxx_proto_init(struct tfe_proto_module*m);