TSG-18286 Proxy支持虚拟表表名变更,删除代理本地缓存,删除tsg-http相关配置

This commit is contained in:
fengweihao
2023-12-29 14:39:03 +08:00
parent b011a92680
commit ced991b4a0
54 changed files with 34 additions and 8443 deletions

View File

@@ -2,7 +2,6 @@
#include "edit_element.h"
#include "pattern_replace.h"
#include "http_lua.h"
#include "tsg_proxy_web_cache.h"
#include <tfe_proxy.h>
#include <tfe_stream.h>
@@ -31,8 +30,6 @@
#include <string.h>
#include <sys/types.h>
#include "ratelimiter.h"
#define MAX_EDIT_ZONE_NUM 64
enum proxy_action //Bigger action number is prior.
@@ -162,10 +159,6 @@ struct tsg_proxy_rt
int plolicy_table_id[POLICY_PROFILE_TABLE_MAX];
ctemplate::Template * tpl_403, * tpl_404, * tpl_451;
long long suspend_max;
int cache_enabled;
struct cache_handle* cache;
screen_stat_handle_t fs_handle;
long long stat_val[__PX_STAT_MAX];
int fs_id[__PX_STAT_MAX];
@@ -173,7 +166,6 @@ struct tsg_proxy_rt
struct event* gcev;
struct tsg_lua_script lua_script;
Ratelimiter_handle_t ratelimiter;
int enable_rate;
int ctrl_plugin_idx;
@@ -1022,8 +1014,8 @@ int proxy_policy_init(const char* profile_path, const char* static_section, cons
table_name[PXY_CTRL_SOURCE_ADDR] = "ATTR_SOURCE_ADDR";
table_name[PXY_CTRL_DESTINATION_ADDR]="ATTR_DESTINATION_ADDR";
table_name[PXY_CTRL_HTTP_URL] = "ATTR_HTTP_URL";
table_name[PXY_CTRL_HTTP_FQDN] = "ATTR_HTTP_HOST_VIRTUAL";
table_name[PXY_CTRL_HTTP_FQDN_CAT] = "ATTR_HTTP_HOST_CAT_VIRTUAL";
table_name[PXY_CTRL_HTTP_FQDN] = "ATTR_SERVER_FQDN";
table_name[PXY_CTRL_HTTP_FQDN_CAT] = "ATTR_SERVER_FQDN_CAT";
table_name[PXY_CTRL_HTTP_REQ_HDR] = "ATTR_HTTP_REQ_HDR";
table_name[PXY_CTRL_HTTP_REQ_BODY] = "ATTR_HTTP_REQ_BODY";
table_name[PXY_CTRL_HTTP_RES_HDR] = "ATTR_HTTP_RES_HDR";
@@ -1084,64 +1076,12 @@ error_out:
return ret;
}
Ratelimiter_handle_t ratelimit_handle_create(const char* profile_path, const char* static_section)
{
int ret=0, field_stat=0;
int redis_db_idx = 0;
int redis_port = 0, interval_sec = 0;
char redis_server[TFE_STRING_MAX] = {0};
char token_name[TFE_STRING_MAX] = {0};
Ratelimiter_handle_t ratelimit = NULL;
MESA_load_profile_int_def(profile_path, static_section, "enable", &(g_proxy_rt->enable_rate), 0);
MESA_load_profile_int_def(profile_path, static_section, "redis_port", &(redis_port), 6379);
MESA_load_profile_string_def(profile_path, static_section, "redis_server", redis_server, sizeof(redis_server), "");
MESA_load_profile_string_def(profile_path, static_section, "token_name", token_name, sizeof(token_name), "");
MESA_load_profile_int_def(profile_path, static_section, "redis_db_index", &(redis_db_idx), 0);
MESA_load_profile_int_def(profile_path, static_section, "interval_sec", &(interval_sec), 1);
if (g_proxy_rt->enable_rate != 1)
{
return NULL;
}
ratelimit=Ratelimiter_create(token_name, g_proxy_rt->local_logger);
Ratelimiter_set_opt(ratelimit, RATELIMITER_OPT_INTERVAL_SEC, &interval_sec, sizeof(interval_sec));
Ratelimiter_set_opt(ratelimit, RATELIMITER_OPT_REDIS_IP, redis_server, strlen(redis_server) + 1);
Ratelimiter_set_opt(ratelimit, RATELIMITER_OPT_REDIS_PORT, &redis_port, sizeof(redis_port));
Ratelimiter_set_opt(ratelimit, RATELIMITER_OPT_REDIS_INDEX, &redis_db_idx, sizeof(redis_db_idx));
/*field stata for debug test**/
MESA_load_profile_int_def(profile_path, static_section, "field_stat", &(field_stat), 0);
if (field_stat == 1)
{
MESA_load_profile_int_def(profile_path, static_section, "stat_port", &(redis_port), 6379);
MESA_load_profile_string_def(profile_path, static_section, "stat_server", redis_server, sizeof(redis_server), "");
Ratelimiter_set_opt(ratelimit, RATELIMITER_OPT_FIELD_STAT, &field_stat, sizeof(field_stat));
Ratelimiter_set_opt(ratelimit, RATELIMITER_OPT_STAT_IP, redis_server, strlen(redis_server));
Ratelimiter_set_opt(ratelimit, RATELIMITER_OPT_STAT_PORT, &redis_port, sizeof(redis_port));
Ratelimiter_set_opt(ratelimit, RATELIMITER_OPT_STAT_PATH, token_name, sizeof(token_name));
Ratelimiter_stat_init(ratelimit);
}
ret = Ratelimiter_start(ratelimit);
if (ret < 0)
{
TFE_LOG_ERROR(g_proxy_rt->local_logger, "%s Ratelimiter init failed.", __FUNCTION__);
goto error_out;
}
return ratelimit;
error_out:
Ratelimiter_stop(ratelimit);
return NULL;
}
int proxy_http_init(struct tfe_proxy * proxy)
{
const char * profile_path = "./conf/pangu/pangu_pxy.conf";;
int temp=0;
const char * profile_path = "./conf/tfe/tfe.conf";;
g_proxy_rt = ALLOC(struct tsg_proxy_rt, 1);
MESA_load_profile_int_def(profile_path, "DEBUG", "enable_plugin", &(g_proxy_rt->enable_plugin), 1);
MESA_load_profile_int_def(profile_path, "tsg_http", "enable_plugin", &(g_proxy_rt->enable_plugin), 1);
if (!g_proxy_rt->enable_plugin)
{
return 0;
@@ -1152,7 +1092,7 @@ int proxy_http_init(struct tfe_proxy * proxy)
g_proxy_rt->local_logger = (void *)MESA_create_runtime_log_handle("tsg_http", RLOG_LV_DEBUG);
g_proxy_rt->send_logger = proxy_log_handle_create(profile_path, "LOG", g_proxy_rt->local_logger);
g_proxy_rt->send_logger = proxy_log_handle_create(profile_path, "tsg_http", g_proxy_rt->local_logger);
if (!g_proxy_rt->send_logger)
{
goto error_out;
@@ -1160,7 +1100,6 @@ int proxy_http_init(struct tfe_proxy * proxy)
g_proxy_rt->fs_handle = tfe_proxy_get_fs_handle();
proxy_http_stat_init(g_proxy_rt);
g_proxy_rt->ratelimiter=ratelimit_handle_create(profile_path, "ratelimit");
if(http_lua_handle_create(&g_proxy_rt->lua_script, g_proxy_rt->thread_num, "tfe") <0)
{
goto error_out;
@@ -1183,22 +1122,8 @@ int proxy_http_init(struct tfe_proxy * proxy)
MESA_load_profile_string_def(profile_path, "TEMPLATE", "PAGE_451", page_path, sizeof(page_path),
"./resource/pangu/HTTP451.html");
g_proxy_rt->tpl_451 = ctemplate::Template::GetTemplate(page_path, ctemplate::DO_NOT_STRIP);
g_proxy_rt->enable_rate=0;
MESA_load_profile_int_def(profile_path, "TANGO_CACHE", "suspend_max", &(temp), 1024*1024);
g_proxy_rt->suspend_max=temp;
MESA_load_profile_int_def(profile_path, "TANGO_CACHE", "enable_cache", &(g_proxy_rt->cache_enabled), 1);
if(g_proxy_rt->cache_enabled)
{
g_proxy_rt->cache = create_web_cache_handle(profile_path, "TANGO_CACHE", g_proxy_rt->gc_evbase,
g_proxy_rt->feather, g_proxy_rt->local_logger);
if(!g_proxy_rt->cache)
{
TFE_LOG_INFO(NULL, "Tango Cache init failed.");
goto error_out;
}
TFE_LOG_INFO(NULL, "Tango Cache Enabled.");
}
TFE_LOG_INFO(NULL, "Tsg_pxy HTTP init success.");
return 0;
@@ -1250,7 +1175,6 @@ struct proxy_http_ctx
long long result[MAX_SCAN_RESULT];
struct maat_state *scan_mid;
struct maat_stream *sp;
struct cache_mid* cmid;
struct maat_rule_t * enforce_rules;
size_t n_enforce;
struct policy_action_param *param;
@@ -1267,15 +1191,7 @@ struct proxy_http_ctx
int (* resumed_cb)(const struct tfe_stream * stream,
const struct tfe_http_session * session, enum tfe_http_event event, const unsigned char * data,
size_t datalen, unsigned int thread_id, struct proxy_http_ctx* ctx);
enum cache_pending_result pending_result;
struct future *f_cache_pending, *f_cache_query;
struct tfe_http_session * ref_session;
struct tfe_http_half* cache_revalidate_req;
struct tfe_http_half* cached_response;
size_t cache_result_declared_sz, cache_result_actual_sz;
struct cache_write_context* cache_write_ctx;
int cache_wirte_result;
size_t c2s_byte_num;
size_t s2c_byte_num;
@@ -1398,34 +1314,7 @@ static void proxy_http_ctx_free(struct proxy_http_ctx * ctx)
maat_stream_free(ctx->sp);
ctx->sp=NULL;
}
if(ctx->cache_write_ctx)
{
web_cache_write_end(ctx->cache_write_ctx);
ctx->cache_write_ctx=NULL;
}
//On session recycle
if(ctx->cache_revalidate_req)
{
tfe_http_half_free(ctx->cache_revalidate_req);
ctx->cache_revalidate_req=NULL;
}
if(ctx->cached_response)
{
//Dirty close
ctx->cached_response=NULL;
}
if(ctx->f_cache_query)
{
future_destroy(ctx->f_cache_query);
ctx->f_cache_query=NULL;
}
if(ctx->f_cache_pending)
{
future_destroy(ctx->f_cache_pending);
ctx->f_cache_pending=NULL;
}
if(ctx->log_req_body)
{
evbuffer_free(ctx->log_req_body);
@@ -1472,7 +1361,7 @@ static unsigned long long try_send_by_token(int inject_sz)
{
return 1;
}
return Ratelimiter_customer_factory(g_proxy_rt->ratelimiter, inject_sz);
return inject_sz;
}
static int pangu_action_weight[__PX_ACTION_MAX] = {0};
@@ -2875,248 +2764,6 @@ void enforce_control_policy(const struct tfe_stream * stream, const struct tfe_h
return;
}
#define RESUMED_CB_NO_MORE_CALLS 0
#define RESUMED_CB_MORE_CALLS 1
int make_revalidate_request(const struct tfe_stream * stream, const struct tfe_http_session * session,
enum tfe_http_event events, const unsigned char * body_frag, size_t frag_size, unsigned int thread_id, struct proxy_http_ctx * ctx)
{
assert(ctx->cache_revalidate_req);
if(events & EV_HTTP_REQ_BODY_BEGIN)
{
tfe_http_half_write_body_begin(ctx->cache_revalidate_req, 1);
}
if(events & EV_HTTP_REQ_BODY_CONT)
{
tfe_http_half_write_body_data(ctx->cache_revalidate_req, body_frag, frag_size);
}
if(events & EV_HTTP_REQ_BODY_END)
{
tfe_http_half_write_body_end(ctx->cache_revalidate_req);
ctx->cache_revalidate_req=NULL;
return RESUMED_CB_NO_MORE_CALLS;
}
if(events & EV_HTTP_REQ_END && ctx->cache_revalidate_req)
{
ctx->cache_revalidate_req=NULL;
return RESUMED_CB_NO_MORE_CALLS;
}
return RESUMED_CB_MORE_CALLS;
}
int dummy_resume(const struct tfe_stream * stream, const struct tfe_http_session * session,
enum tfe_http_event events, const unsigned char * body_frag, size_t frag_size, unsigned int thread_id, struct proxy_http_ctx * ctx)
{
return RESUMED_CB_NO_MORE_CALLS;
}
static void cache_read_on_succ(future_result_t * result, void * user)
{
struct proxy_http_ctx * ctx = (struct proxy_http_ctx *)user;
const struct cached_meta* meta=NULL;
enum cache_query_result_type type=cache_query_result_get_type(result);
const unsigned char* data=NULL;
size_t data_sz=0;
char temp[TFE_STRING_MAX];
switch(type)
{
case CACHE_QUERY_RESULT_META:
meta=cache_query_result_read_meta(result);
ctx->cache_result_declared_sz=meta->content_length;
ctx->resumed_cb=dummy_resume;
tfe_http_session_resume(ctx->ref_session);
ATOMIC_DEC(&(g_proxy_rt->stat_val[STAT_SUSPENDING]));
ctx->cached_response=tfe_http_session_response_create(ctx->ref_session, 200);
tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_TYPE, meta->content_type);
tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_LAST_MODIFIED, meta->last_modified);
tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_ETAG, meta->etag);
tfe_http_nonstd_field_write(ctx->cached_response, "X-TG-Cache-Lookup", "HIT");
snprintf(temp, sizeof(temp), "%lu", meta->content_length);
tfe_http_std_field_write(ctx->cached_response, TFE_HTTP_CONT_LENGTH, temp);
//Dirty code here.
tfe_http_session_response_set(ctx->ref_session, ctx->cached_response);
//From now, ownership of cached_response has been transfered to http session,
//bussines plugin only hold this pointer as an reference for writing response body.
tfe_http_half_write_body_begin(ctx->cached_response, 1);
meta=NULL;
break;
case CACHE_QUERY_RESULT_DATA:
data_sz=cache_query_result_get_data(result, &data);
tfe_http_half_write_body_data(ctx->cached_response, data, data_sz);
ctx->cache_result_actual_sz+=data_sz;
break;
case CACHE_QUERY_RESULT_END:
assert(ctx->cached_response!=NULL);
tfe_http_half_write_body_end(ctx->cached_response);
//ownership has been transferred to http session, set to NULL.
ctx->pending_result=PENDING_RESULT_HIT;
ctx->cached_response=NULL;
assert(ctx->cache_result_actual_sz==ctx->cache_result_declared_sz);
future_destroy(ctx->f_cache_query);
ctx->f_cache_query=NULL;
break;
case CACHE_QUERY_RESULT_MISS:
ctx->pending_result=PENDING_RESULT_MISS;
ctx->resumed_cb=dummy_resume;
tfe_http_session_resume(ctx->ref_session);
ATOMIC_DEC(&(g_proxy_rt->stat_val[STAT_SUSPENDING]));
future_destroy(ctx->f_cache_query);
ctx->f_cache_query=NULL;
break;
default:
break;
}
return;
}
static void cache_read_on_fail(enum e_future_error err, const char * what, void * user)
{
struct proxy_http_ctx * ctx = (struct proxy_http_ctx *)user;
future_destroy(ctx->f_cache_query);
ctx->f_cache_query=NULL;
if(!ctx->cached_response)
{
tfe_http_session_resume(ctx->ref_session);
ctx->resumed_cb=dummy_resume;
ATOMIC_DEC(&(g_proxy_rt->stat_val[STAT_SUSPENDING]));
}
else
{
tfe_http_half_write_body_end(ctx->cached_response);
ctx->cached_response=NULL;
}
ctx->pending_result=PENDING_RESULT_MISS;
printf("cache query failed: %s %s\n", ctx->ref_session->req->req_spec.url, what);
}
static void cache_pend_on_succ(future_result_t * result, void * user)
{
struct proxy_http_ctx * ctx = (struct proxy_http_ctx *)user;
const struct cached_meta* meta=NULL;
meta=cache_pending_result_read_meta(result, ctx->cmid);
ctx->resumed_cb=dummy_resume;
tfe_http_session_resume(ctx->ref_session);
ATOMIC_DEC(&(g_proxy_rt->stat_val[STAT_SUSPENDING]));
future_destroy(ctx->f_cache_pending);
ctx->f_cache_pending=NULL;
if(meta==NULL)
{
ctx->pending_result = PENDING_RESULT_MISS;
return;
}
if( meta->etag==NULL && meta->last_modified==NULL)
{
ctx->pending_result = PENDING_RESULT_MISS;
return;
}
ctx->pending_result=PENDING_RESULT_REVALIDATE;
struct http_field_name in_field_name;
const char * in_header_value = NULL;
void * iterator = NULL;
ctx->cache_revalidate_req=tfe_http_session_request_create(ctx->ref_session,
ctx->ref_session->req->req_spec.method, ctx->ref_session->req->req_spec.uri);
while (true)
{
if ((in_header_value = tfe_http_field_iterate(ctx->ref_session->req, &iterator, &in_field_name)) == NULL)
{
break;
}
if(in_field_name.field_id==TFE_HTTP_IF_MATCH || in_field_name.field_id==TFE_HTTP_IF_NONE_MATCH
|| in_field_name.field_id==TFE_HTTP_IF_MODIFIED_SINCE
|| in_field_name.field_id==TFE_HTTP_IF_UNMODIFIED_SINCE)
{
continue;
}
tfe_http_field_write(ctx->cache_revalidate_req, &in_field_name, in_header_value);
}
if(meta->etag) tfe_http_std_field_write(ctx->cache_revalidate_req, TFE_HTTP_IF_NONE_MATCH, meta->etag);
if(meta->last_modified) tfe_http_std_field_write(ctx->cache_revalidate_req, TFE_HTTP_IF_MODIFIED_SINCE, meta->last_modified);
tfe_http_session_request_set(ctx->ref_session, ctx->cache_revalidate_req);
ctx->resumed_cb=make_revalidate_request;
return;
}
static void cache_pend_on_fail(enum e_future_error err, const char * what, void * user)
{
struct proxy_http_ctx * ctx = (struct proxy_http_ctx *)user;
ctx->pending_result=PENDING_RESULT_FOBIDDEN;
tfe_http_session_resume(ctx->ref_session);
ATOMIC_DEC(&(g_proxy_rt->stat_val[STAT_SUSPENDING]));
ctx->resumed_cb=dummy_resume;
future_destroy(ctx->f_cache_pending);
ctx->f_cache_pending=NULL;
return;
}
void cache_pend(const struct tfe_http_session * session, unsigned int thread_id, struct proxy_http_ctx * ctx)
{
if(g_proxy_rt->stat_val[STAT_SUSPENDING]>g_proxy_rt->suspend_max)
{
ctx->pending_result=PENDING_RESULT_FOBIDDEN;
return;
}
ctx->f_cache_pending=future_create("cache_pend", cache_pend_on_succ, cache_pend_on_fail, ctx);
ctx->ref_session=tfe_http_session_allow_write(session);
ctx->pending_result=web_cache_async_pending(g_proxy_rt->cache, thread_id, session->req, &(ctx->cmid), ctx->f_cache_pending);
switch(ctx->pending_result)
{
case PENDING_RESULT_REVALIDATE:
tfe_http_session_suspend(ctx->ref_session);
ATOMIC_INC(&(g_proxy_rt->stat_val[STAT_SUSPENDING]));
break;
case PENDING_RESULT_ALLOWED:
case PENDING_RESULT_FOBIDDEN:
case PENDING_RESULT_MISS:
future_destroy(ctx->f_cache_pending);
ctx->f_cache_pending=NULL;
break;
default:
break;
}
return;
}
void cache_read(const struct tfe_http_session * session, unsigned int thread_id, struct proxy_http_ctx * ctx)
{
if(g_proxy_rt->stat_val[STAT_SUSPENDING]>g_proxy_rt->suspend_max)
{
return;
}
ctx->f_cache_query=future_create("cache_read", cache_read_on_succ, cache_read_on_fail, ctx);
int ret=web_cache_async_read(g_proxy_rt->cache, thread_id, session->req, &(ctx->cmid), ctx->f_cache_query);
if(ret==0)
{
ctx->ref_session=tfe_http_session_allow_write(session);
tfe_http_session_suspend(ctx->ref_session);
ATOMIC_INC(&(g_proxy_rt->stat_val[STAT_SUSPENDING]));
}
else
{
future_destroy(ctx->f_cache_query);
ctx->f_cache_query=NULL;
}
}
void cache_write(const struct tfe_http_session * session, enum tfe_http_event events,
const unsigned char * body_frag, size_t frag_size,
unsigned int thread_id, struct proxy_http_ctx * ctx)
{
if(events & EV_HTTP_RESP_BODY_BEGIN)
{
ctx->cache_write_ctx=web_cache_write_start(g_proxy_rt->cache, thread_id, session, &(ctx->cmid));
}
if(events & EV_HTTP_RESP_BODY_CONT && ctx->cache_write_ctx!=NULL)
{
web_cache_write(ctx->cache_write_ctx, body_frag, frag_size);
}
if(events & EV_HTTP_RESP_BODY_END && ctx->cache_write_ctx!=NULL)
{
ctx->cache_wirte_result=web_cache_write_end(ctx->cache_write_ctx);
ctx->cache_write_ctx=NULL;
}
}
void proxy_on_http_begin(const struct tfe_stream *stream, const struct tfe_http_session *session, unsigned int thread_id, void **pme)
{
@@ -3387,11 +3034,6 @@ void proxy_on_http_end(const struct tfe_stream * stream,
ATOMIC_INC(&(g_proxy_rt->stat_val[STAT_ACTION_RUN_SCRIPT]));
}
TFE_LOG_DEBUG(g_proxy_rt->local_logger, "cache %s %s upload=%d",
session->req->req_spec.url,
cache_pending_result_string(ctx->pending_result),
ctx->cache_wirte_result);
cache_mid_clear(&(ctx->cmid));
proxy_http_ctx_free(ctx);
*pme = NULL;
return;
@@ -3418,30 +3060,6 @@ int proxy_on_http_data(const struct tfe_stream * stream, const struct tfe_http_s
}
enforce_control_policy(stream, session, events, body_frag, frag_size,thread_id, ctx);
if(g_proxy_rt->cache_enabled && ctx->action == PX_ACTION_NONE)
{
if(events & EV_HTTP_REQ_HDR)
{
cache_pend(session, thread_id, ctx);
if(ctx->pending_result==PENDING_RESULT_ALLOWED)
{
cache_read(session, thread_id, ctx);
}
}
if(events & EV_HTTP_RESP_HDR && ctx->pending_result==PENDING_RESULT_REVALIDATE)
{
if(session->resp->resp_spec.resp_code==TFE_HTTP_STATUS_NOT_MODIFIED)
{
cache_read(session, thread_id, ctx);
}
}
if(tfe_http_in_response(events) && ctx->pending_result==PENDING_RESULT_MISS)
{
cache_write(session, events, body_frag, frag_size, thread_id, ctx);
}
}
return NO_CALL_NEXT_PLUGIN;
}